diff --git a/storage/gc_queue.go b/storage/gc_queue.go index cd75a5600ff1..d331317d976d 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -283,7 +283,8 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, } } - if pErr := repl.resolveIntents(repl.context(), intents, true /* wait */, false /* !poison */); pErr != nil { + if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, + true /* wait */, false /* !poison */); pErr != nil { return pErr.GoError() } @@ -346,14 +347,14 @@ func processTransactionTable(r *Replica, txnMap map[uuid.UUID]*roachpb.Transacti // Note: Most aborted transaction weren't aborted by their client, // but instead by the coordinator - those will not have any intents // persisted, though they still might exist in the system. - if err := r.resolveIntents(r.context(), + if err := r.store.intentResolver.resolveIntents(r.context(), r, roachpb.AsIntents(txn.Intents, &txn), true /* wait */, false /* !poison */); err != nil { log.Warningf("failed to resolve intents of aborted txn on gc: %s", err) } case roachpb.COMMITTED: // It's committed, so it doesn't need a push but we can only // GC it after its intents are resolved. - if err := r.resolveIntents(r.context(), + if err := r.store.intentResolver.resolveIntents(r.context(), r, roachpb.AsIntents(txn.Intents, &txn), true /* wait */, false /* !poison */); err != nil { log.Warningf("unable to resolve intents of committed txn on gc: %s", err) // Returning the error here would abort the whole GC run, and diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go new file mode 100644 index 000000000000..93b5b3a8cc60 --- /dev/null +++ b/storage/intent_resolver.go @@ -0,0 +1,447 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Ben Darnell + +package storage + +import ( + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/base" + "github.com/cockroachdb/cockroach/client" + "github.com/cockroachdb/cockroach/keys" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/util/log" + "github.com/cockroachdb/cockroach/util/tracing" + "github.com/cockroachdb/cockroach/util/uuid" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" +) + +// intentResolverTaskLimit is the maximum number of asynchronous tasks +// that may be started by intentResolver. When this limit is reached +// asynchronous tasks will start to block to apply backpressure. +// This is a last line of defense against issues like #4925. +// TODO(bdarnell): how to determine best value? +const intentResolverTaskLimit = 100 + +// intentResolver manages the process of pushing transactions and +// resolving intents. +type intentResolver struct { + store *Store + + sem chan struct{} // Semaphore to limit async goroutines. + + mu struct { + sync.Mutex + // Maps transaction ids to a refcount. + inFlight map[uuid.UUID]int + } +} + +func newIntentResolver(store *Store) *intentResolver { + ir := &intentResolver{ + store: store, + sem: make(chan struct{}, intentResolverTaskLimit), + } + ir.mu.inFlight = map[uuid.UUID]int{} + return ir +} + +// processWriteIntentError tries to push the conflicting +// transaction(s) responsible for the given WriteIntentError, and to +// resolve those intents if possible. Returns a new error to be used +// in place of the original. +// +// The returned error may be a copy of the original WriteIntentError, +// with or without the Resolved flag set, which governs the client's +// retry behavior (if the transaction is pushed, the Resolved flag is +// set to tell the client to retry immediately; otherwise it is false +// to cause the client to back off). +func (ir *intentResolver) processWriteIntentError(ctx context.Context, + wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, + pushType roachpb.PushTxnType) *roachpb.Error { + + if log.V(6) { + log.Infoc(ctx, "resolving write intent %s", wiErr) + } + + method := args.Method() + readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param + + resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false) + if resErr := ir.resolveIntents(ctx, r, resolveIntents, + false /* !wait */, true /* poison */); resErr != nil { + // When resolving without waiting, errors should not + // usually be returned here, although there are some cases + // when they may be (especially when a test cluster is in + // the process of shutting down). + log.Warningf("asynchronous resolveIntents failed: %s", resErr) + } + + if pushErr != nil { + if log.V(1) { + log.Infoc(ctx, "on %s: %s", method, pushErr) + } + + // For write/write conflicts within a transaction, propagate the + // push failure, not the original write intent error. The push + // failure will instruct the client to restart the transaction + // with a backoff. + if h.Txn != nil && h.Txn.ID != nil && !readOnly { + return pushErr + } + + // For read/write conflicts, and non-transactional write/write + // conflicts, return the write intent error which engages + // backoff/retry (with !Resolved). We don't need to restart the + // txn, only resend the read with a backoff. + return roachpb.NewError(&wiErr) + } + + // We pushed all transactions, so tell the client everything's + // resolved and it can retry immediately. + wiErr.Resolved = true + return roachpb.NewError(&wiErr) +} + +// maybePushTransaction tries to push the conflicting transaction(s) +// responsible for the given intents: either move its +// timestamp forward on a read/write conflict, abort it on a +// write/write conflict, or do nothing if the transaction is no longer +// pending. +// +// Returns a slice of intents which can now be resolved, and an error. +// The returned intents should be resolved via +// intentResolver.resolveIntents regardless of any error returned by +// maybePushTransaction, but if the error is non-nil then some of the +// conflicting transactions may still be pending. +// +// If skipIfInFlight is true, then no PushTxns will be sent and no +// intents will be returned for any transaction for which there is +// another push in progress. This should only be used by callers who +// are not relying on the side effect of a push (i.e. only +// pushType==PUSH_TOUCH), and who also don't need to synchronize with +// the resolution of those intents (e.g. asynchronous resolutions of +// intents skipped on inconsistent reads). +// +// Callers are involved with +// a) conflict resolution for commands being executed at the Store with the +// client waiting, +// b) resolving intents encountered during inconsistent operations, and +// c) resolving intents upon EndTransaction which are not local to the given +// range. This is the only path in which the transaction is going to be +// in non-pending state and doesn't require a push. +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, + h roachpb.Header, pushType roachpb.PushTxnType, skipIfInFlight bool) ( + []roachpb.Intent, *roachpb.Error) { + + now := ir.store.Clock().Now() + + pusherTxn := h.Txn + // If there's no pusher, we communicate a priority by sending an empty + // txn with only the priority set. + if pusherTxn == nil { + pusherTxn = &roachpb.Transaction{ + Priority: roachpb.MakePriority(h.UserPriority), + } + } + + sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx) + defer cleanupSp() + sp.LogEvent("intent resolution") + + // Split intents into those we need to push and those which are good to + // resolve. + ir.mu.Lock() + // TODO(tschottdorf): can optimize this and use same underlying slice. + var pushIntents, resolveIntents []roachpb.Intent + for _, intent := range intents { + if intent.Status != roachpb.PENDING { + // The current intent does not need conflict resolution + // because the transaction is already finalized. + // TODO(bdarnell): can this happen any more? + resolveIntents = append(resolveIntents, intent) + } else if _, ok := ir.mu.inFlight[*intent.Txn.ID]; ok && skipIfInFlight { + // Another goroutine is working on this transaction so we can + // skip it. + if log.V(1) { + log.Infof("skipping PushTxn for %s; attempt already in flight", intent.Txn.ID) + } + continue + } else { + pushIntents = append(pushIntents, intent) + ir.mu.inFlight[*intent.Txn.ID]++ + } + } + ir.mu.Unlock() + + // Attempt to push the transaction(s) which created the conflicting intent(s). + var pushReqs []roachpb.Request + for _, intent := range pushIntents { + pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ + Span: roachpb.Span{ + Key: intent.Txn.Key, + }, + PusherTxn: *pusherTxn, + PusheeTxn: intent.Txn, + PushTo: h.Timestamp, + // The timestamp is used by PushTxn for figuring out whether the + // transaction is abandoned. If we used the argument's timestamp + // here, we would run into busy loops because that timestamp + // usually stays fixed among retries, so it will never realize + // that a transaction has timed out. See #877. + Now: now, + PushType: pushType, + }) + } + // TODO(kaneda): Set the transaction in the header so that the + // txn is correctly propagated in an error response. + b := &client.Batch{} + b.InternalAddRequest(pushReqs...) + br, err := ir.store.db.RunWithResponse(b) + ir.mu.Lock() + for _, intent := range pushIntents { + ir.mu.inFlight[*intent.Txn.ID]-- + if ir.mu.inFlight[*intent.Txn.ID] == 0 { + delete(ir.mu.inFlight, *intent.Txn.ID) + } + } + ir.mu.Unlock() + if err != nil { + // TODO(bdarnell): return resolveIntents even on error. + return nil, err + } + + for i, intent := range pushIntents { + pushee := br.Responses[i].GetInner().(*roachpb.PushTxnResponse).PusheeTxn + intent.Txn = pushee.TxnMeta + intent.Status = pushee.Status + resolveIntents = append(resolveIntents, intent) + } + return resolveIntents, nil +} + +// processIntentsAsync asynchronously processes intents which were +// encountered during another command but did not interfere with the +// execution of that command. This occurs in two cases: inconsistent +// reads and EndTransaction (which queues its own external intents for +// processing via this method). The two cases are handled somewhat +// differently and would be better served by different entry points, +// but combining them simplifies the plumbing necessary in Replica. +func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithArg) { + if len(intents) == 0 { + return + } + now := r.store.Clock().Now() + ctx := r.context() + stopper := r.store.Stopper() + + for _, item := range intents { + if item.args.Method() != roachpb.EndTransaction { + stopper.RunLimitedAsyncTask(ir.sem, func() { + // Everything here is best effort; give up rather than waiting + // too long (helps avoid deadlocks during test shutdown, + // although this is imperfect due to the use of an + // uninterruptible WaitGroup.Wait in beginCmds). + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + h := roachpb.Header{Timestamp: now} + resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, + item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */) + if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) + return + } + if pushErr != nil { + log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pushErr) + return + } + }) + } else { // EndTransaction + stopper.RunLimitedAsyncTask(ir.sem, func() { + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + + // For EndTransaction, we know the transaction is finalized so + // we can skip the push and go straight to the resolve. + if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) + return + } + + // We successfully resolved the intents, so we're able to GC from + // the txn span directly. Note that the sequence cache was cleared + // out synchronously with EndTransaction (see comments within for + // an explanation of why that is kosher). + // + // Note that we poisoned the sequence caches on the external ranges + // above. This may seem counter-intuitive, but it's actually + // necessary: Assume a transaction has committed here, with two + // external intents, and assume that we did not poison. Normally, + // these two intents would be resolved in the same batch, but that + // is not guaranteed (for example, if DistSender has a stale + // descriptor after a Merge). When resolved separately, the first + // ResolveIntent would clear out the sequence cache; an individual + // write on the second (still present) intent could then be + // replayed and would resolve to a real value (at least for a + // window of time unless we delete the local txn entry). That's not + // OK for non-idempotent commands such as Increment. + // TODO(tschottdorf): We should have another side effect on + // MVCCResolveIntent (on commit/abort): If it were able to remove + // the txn from its corresponding entries in the timestamp cache, + // no more replays at the same timestamp would be possible. This + // appears to be a useful performance optimization; we could then + // not poison on EndTransaction. In fact, the above mechanism + // could be an effective alternative to sequence-cache based + // poisoning (or the whole sequence cache?) itself. + // + // TODO(tschottdorf): down the road, can probably unclog the system + // here by batching up a bunch of those GCRequests before proposing. + var ba roachpb.BatchRequest + txn := item.intents[0].Txn + gcArgs := roachpb.GCRequest{ + Span: roachpb.Span{ + Key: r.Desc().StartKey.AsRawKey(), + EndKey: r.Desc().EndKey.AsRawKey(), + }, + } + gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{ + Key: keys.TransactionKey(txn.Key, txn.ID), + }) + ba.Add(&gcArgs) + if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { + log.Warningf("could not GC completed transaction: %s", pErr) + } + }) + } + } +} + +// resolveIntents resolves the given intents. For those which are +// local to the range, we submit directly to the local Raft instance; +// all non-local intents are resolved asynchronously in a batch. If +// `wait` is true, all operations are carried out synchronously and an +// error is returned. Otherwise, the call returns without error as +// soon as all local resolve commands have been **proposed** (not +// executed). This ensures that if a waiting client retries +// immediately after calling this function, it will not hit the same +// intents again. +func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, + intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { + + sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx) + defer cleanupSp() + + // We're doing async stuff below; those need new traces. + ctx = opentracing.ContextWithSpan(ctx, nil) + sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) + + var reqsRemote []roachpb.Request + baLocal := roachpb.BatchRequest{} + for i := range intents { + intent := intents[i] // avoids a race in `i, intent := range ...` + var resolveArgs roachpb.Request + var local bool // whether this intent lives on this Range + { + if len(intent.EndKey) == 0 { + resolveArgs = &roachpb.ResolveIntentRequest{ + Span: intent.Span, + IntentTxn: intent.Txn, + Status: intent.Status, + Poison: poison, + } + local = r.ContainsKey(intent.Key) + } else { + resolveArgs = &roachpb.ResolveIntentRangeRequest{ + Span: intent.Span, + IntentTxn: intent.Txn, + Status: intent.Status, + Poison: poison, + } + local = r.ContainsKeyRange(intent.Key, intent.EndKey) + } + } + + // If the intent isn't (completely) local, we'll need to send an external request. + // We'll batch them all up and send at the end. + if local { + baLocal.Add(resolveArgs) + } else { + reqsRemote = append(reqsRemote, resolveArgs) + } + } + + // The local batch goes directly to Raft. + var wg sync.WaitGroup + if len(baLocal.Requests) > 0 { + action := func() *roachpb.Error { + // Trace this under the ID of the intent owner. + sp := r.store.Tracer().StartSpan("resolve intents") + defer sp.Finish() + ctx = opentracing.ContextWithSpan(ctx, sp) + // Always operate with a timeout when resolving intents: this + // prevents rare shutdown timeouts in tests. + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + _, pErr := r.addWriteCmd(ctxWithTimeout, baLocal, &wg) + return pErr + } + wg.Add(1) + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { + if err := action(); err != nil { + log.Warningf("unable to resolve local intents; %s", err) + } + }) { + // Still run the task when draining. Our caller already has a task and + // going async here again is merely for performance, but some intents + // need to be resolved because they might block other tasks. See #1684. + // Note that handleSkippedIntents has a TODO in case #1684 comes back. + if err := action(); err != nil { + return err + } + } + } + + // Resolve all of the intents which aren't local to the Range. + if len(reqsRemote) > 0 { + b := &client.Batch{} + b.InternalAddRequest(reqsRemote...) + action := func() *roachpb.Error { + // TODO(tschottdorf): no tracing here yet. + return r.store.DB().Run(b) + } + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { + if err := action(); err != nil { + log.Warningf("unable to resolve external intents: %s", err) + } + }) { + // As with local intents, try async to not keep the caller waiting, but + // when draining just go ahead and do it synchronously. See #1684. + if err := action(); err != nil { + return err + } + } + } + + // Wait until the local ResolveIntents batch has been submitted to + // raft. No-op if all were non-local. + wg.Wait() + return nil +} diff --git a/storage/replica.go b/storage/replica.go index db5e7fdf38eb..da1798fc2516 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -35,7 +35,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" - "github.com/cockroachdb/cockroach/base" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" @@ -930,7 +929,7 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) ( // conditions as described in #2231. pErr = r.checkSequenceCache(r.store.Engine(), *ba.Txn) } - r.handleSkippedIntents(intents) + r.store.intentResolver.processIntentsAsync(r, intents) return br, pErr } @@ -1443,7 +1442,7 @@ func (r *Replica) applyRaftCommand(ctx context.Context, index uint64, originRepl // On the replica on which this command originated, resolve skipped intents // asynchronously - even on failure. if originReplica.StoreID == r.store.StoreID() { - r.handleSkippedIntents(intents) + r.store.intentResolver.processIntentsAsync(r, intents) } return br, rErr @@ -1828,86 +1827,6 @@ func (r *Replica) maybeGossipSystemConfig() { r.systemDBHash = hash } -func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { - if len(intents) == 0 { - return - } - now := r.store.Clock().Now() - ctx := r.context() - stopper := r.store.Stopper() - - for _, item := range intents { - // TODO(tschottdorf): avoid data race related to batch unrolling in ExecuteCmd; - // can probably go again when that provisional code there is gone. Should - // still be careful though, a retry could happen and race with args. - args := util.CloneProto(item.args).(roachpb.Request) - stopper.RunAsyncTask(func() { - // Everything here is best effort; give up rather than waiting - // too long (helps avoid deadlocks during test shutdown, - // although this is imperfect due to the use of an - // uninterruptible WaitGroup.Wait in beginCmds). - ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) - defer cancel() - h := roachpb.Header{Timestamp: now} - resolveIntents, pErr := r.store.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ - Intents: item.intents, - }, r, args, h, roachpb.PUSH_TOUCH) - if wiErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { - log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pErr) - return - } - if pErr := r.resolveIntents(ctxWithTimeout, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { - log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) - return - } - // We successfully resolved the intents, so we're able to GC from - // the txn span directly. Note that the sequence cache was cleared - // out synchronously with EndTransaction (see comments within for - // an explanation of why that is kosher). - // - // Note that we poisoned the sequence caches on the external ranges - // above. This may seem counter-intuitive, but it's actually - // necessary: Assume a transaction has committed here, with two - // external intents, and assume that we did not poison. Normally, - // these two intents would be resolved in the same batch, but that - // is not guaranteed (for example, if DistSender has a stale - // descriptor after a Merge). When resolved separately, the first - // ResolveIntent would clear out the sequence cache; an individual - // write on the second (still present) intent could then be - // replayed and would resolve to a real value (at least for a - // window of time unless we delete the local txn entry). That's not - // OK for non-idempotent commands such as Increment. - // TODO(tschottdorf): We should have another side effect on - // MVCCResolveIntent (on commit/abort): If it were able to remove - // the txn from its corresponding entries in the timestamp cache, - // no more replays at the same timestamp would be possible. This - // appears to be a useful performance optimization; we could then - // not poison on EndTransaction. In fact, the above mechanism - // could be an effective alternative to sequence-cache based - // poisoning (or the whole sequence cache?) itself. - // - // TODO(tschottdorf): down the road, can probably unclog the system - // here by batching up a bunch of those GCRequests before proposing. - if args.Method() == roachpb.EndTransaction { - var ba roachpb.BatchRequest - txn := item.intents[0].Txn - gcArgs := roachpb.GCRequest{ - Span: roachpb.Span{ - Key: r.Desc().StartKey.AsRawKey(), - EndKey: r.Desc().EndKey.AsRawKey(), - }, - } - gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)}) - - ba.Add(&gcArgs) - if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { - log.Warningf("could not GC completed transaction: %s", pErr) - } - } - }) - } -} - // newReplicaCorruptionError creates a new error indicating a corrupt replica, // with the supplied list of errors given as history. func newReplicaCorruptionError(errs ...error) *roachpb.ReplicaCorruptionError { @@ -1938,114 +1857,6 @@ func (r *Replica) maybeSetCorrupt(pErr *roachpb.Error) *roachpb.Error { return pErr } -// resolveIntents resolves the given intents. For those which are local to the -// range, we submit directly to the range-local Raft instance; all non-local -// intents are resolved asynchronously in a batch. If `wait` is true, all -// operations are carried out synchronously and an error is returned. -// Otherwise, the call returns without error as soon as all local resolve -// commands have been **proposed** (not executed). This ensures that if a -// waiting client retries immediately after calling this function, it will not -// hit the same intents again. -func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { - sp, cleanupSp := tracing.SpanFromContext(opReplica, r.store.Tracer(), ctx) - defer cleanupSp() - - ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces - sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) - - var reqsRemote []roachpb.Request - baLocal := roachpb.BatchRequest{} - for i := range intents { - intent := intents[i] // avoids a race in `i, intent := range ...` - var resolveArgs roachpb.Request - var local bool // whether this intent lives on this Range - { - if len(intent.EndKey) == 0 { - resolveArgs = &roachpb.ResolveIntentRequest{ - Span: intent.Span, - IntentTxn: intent.Txn, - Status: intent.Status, - Poison: poison, - } - local = r.ContainsKey(intent.Key) - } else { - resolveArgs = &roachpb.ResolveIntentRangeRequest{ - Span: intent.Span, - IntentTxn: intent.Txn, - Status: intent.Status, - Poison: poison, - } - local = r.ContainsKeyRange(intent.Key, intent.EndKey) - } - } - - // If the intent isn't (completely) local, we'll need to send an external request. - // We'll batch them all up and send at the end. - if local { - baLocal.Add(resolveArgs) - } else { - reqsRemote = append(reqsRemote, resolveArgs) - } - } - - // The local batch goes directly to Raft. - var wg sync.WaitGroup - if len(baLocal.Requests) > 0 { - action := func() *roachpb.Error { - // Trace this under the ID of the intent owner. - sp := r.store.Tracer().StartSpan("resolve intents") - defer sp.Finish() - ctx = opentracing.ContextWithSpan(ctx, sp) - // Always operate with a timeout when resolving intents: this - // prevents rare shutdown timeouts in tests. - ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) - defer cancel() - _, pErr := r.addWriteCmd(ctxWithTimeout, baLocal, &wg) - return pErr - } - wg.Add(1) - if wait || !r.store.Stopper().RunAsyncTask(func() { - if err := action(); err != nil { - log.Warningf("unable to resolve local intents; %s", err) - } - }) { - // Still run the task when draining. Our caller already has a task and - // going async here again is merely for performance, but some intents - // need to be resolved because they might block other tasks. See #1684. - // Note that handleSkippedIntents has a TODO in case #1684 comes back. - if err := action(); err != nil { - return err - } - } - } - - // Resolve all of the intents which aren't local to the Range. - if len(reqsRemote) > 0 { - b := &client.Batch{} - b.InternalAddRequest(reqsRemote...) - action := func() *roachpb.Error { - // TODO(tschottdorf): no tracing here yet. - return r.store.DB().Run(b) - } - if wait || !r.store.Stopper().RunAsyncTask(func() { - if err := action(); err != nil { - log.Warningf("unable to resolve external intents: %s", err) - } - }) { - // As with local intents, try async to not keep the caller waiting, but - // when draining just go ahead and do it synchronously. See #1684. - if err := action(); err != nil { - return err - } - } - } - - // Wait until the local ResolveIntents batch has been submitted to - // raft. No-op if all were non-local. - wg.Wait() - return nil -} - var errSystemConfigIntent = errors.New("must retry later due to intent on SystemConfigSpan") // loadSystemConfigSpan scans the entire SystemConfig span and returns the full @@ -2064,7 +1875,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { // There were intents, so what we read may not be consistent. Attempt // to nudge the intents in case they're expired; next time around we'll // hopefully have more luck. - r.handleSkippedIntents(intents) + r.store.intentResolver.processIntentsAsync(r, intents) return nil, nil, errSystemConfigIntent } kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows diff --git a/storage/replica_test.go b/storage/replica_test.go index 324fe78edd9d..a311426c1fcb 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -2286,11 +2286,12 @@ func TestReplicaResolveIntentNoWait(t *testing.T) { setupResolutionTest(t, tc, roachpb.Key("a") /* irrelevant */, splitKey) txn := newTransaction("name", key, 1, roachpb.SERIALIZABLE, tc.clock) txn.Status = roachpb.COMMITTED - if pErr := tc.rng.resolveIntents(context.Background(), []roachpb.Intent{{ - Span: roachpb.Span{Key: key}, - Txn: txn.TxnMeta, - Status: txn.Status, - }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { + if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, + []roachpb.Intent{{ + Span: roachpb.Span{Key: key}, + Txn: txn.TxnMeta, + Status: txn.Status, + }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { t.Fatal(pErr) } util.SucceedsSoon(t, func() error { diff --git a/storage/store.go b/storage/store.go index 9702082fe705..8c4614153e23 100644 --- a/storage/store.go +++ b/storage/store.go @@ -268,6 +268,7 @@ type Store struct { replicaConsistencyQueue *replicaConsistencyQueue // Replica consistency check queue consistencyScanner *replicaScanner // Consistency checker scanner metrics *storeMetrics + intentResolver *intentResolver wakeRaftLoop chan struct{} started int32 stopper *stop.Stopper @@ -536,6 +537,7 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip raftRequestChan: make(chan *RaftMessageRequest, raftReqBufferSize), metrics: newStoreMetrics(), } + s.intentResolver = newIntentResolver(s) s.mu.Lock() s.mu.replicas = map[roachpb.RangeID]*Replica{} @@ -1587,16 +1589,7 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. // after our operation started. This allows us to not have to // restart for uncertainty as we come back and read. h.Timestamp.Forward(now) - resolveIntents, pErrPush := s.resolveWriteIntentError(ctx, wiErr, rng, args, h, pushType) - if len(resolveIntents) > 0 { - if resErr := rng.resolveIntents(ctx, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { - // When resolving asynchronously, errors should not - // usually be returned here, although there are some cases - // when they may be (especially when a test cluster is in - // the process of shutting down). - log.Warningf("asynchronous resolveIntents failed: %s", resErr) - } - } + pErrPush := s.intentResolver.processWriteIntentError(ctx, *wiErr, rng, args, h, pushType) // Preserve the error index. oldIndex := pErr.Index pErr = pErrPush @@ -1649,112 +1642,6 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. return nil, pErr } -// resolveWriteIntentError tries to push the conflicting transaction (if -// necessary, i.e. if the transaction is pending): either move its timestamp -// forward on a read/write conflict, or abort it on a write/write conflict. If -// the push succeeds (or if it wasn't necessary), the error's Resolved flag is -// set to to true and the caller should call resolveIntents and retry its -// command immediately. If the push fails, the error's Resolved flag is set to -// false so that the client backs off before reissuing the command. On -// write/write conflicts, a potential push error is returned; otherwise the -// updated WriteIntentError is returned. -// -// Callers are involved with -// a) conflict resolution for commands being executed at the Store with the -// client waiting, -// b) resolving intents encountered during inconsistent operations, and -// c) resolving intents upon EndTransaction which are not local to the given -// range. This is the only path in which the transaction is going to be -// in non-pending state and doesn't require a push. -func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { - method := args.Method() - pusherTxn := h.Txn - readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param - args = nil - - if log.V(6) { - log.Infoc(ctx, "resolving write intent %s", wiErr) - } - sp, cleanupSp := tracing.SpanFromContext(opStore, s.Tracer(), ctx) - defer cleanupSp() - sp.LogEvent("intent resolution") - - // Split intents into those we need to push and those which are good to - // resolve. - // TODO(tschottdorf): can optimize this and use same underlying slice. - var pushIntents, resolveIntents []roachpb.Intent - for _, intent := range wiErr.Intents { - // The current intent does not need conflict resolution. - if intent.Status != roachpb.PENDING { - resolveIntents = append(resolveIntents, intent) - } else { - pushIntents = append(pushIntents, intent) - } - } - - // Attempt to push the transaction(s) which created the conflicting intent(s). - now := s.Clock().Now() - - // TODO(tschottdorf): need deduplication here (many pushes for the same - // txn are awkward but even worse, could ratchet up the priority). - // If there's no pusher, we communicate a priority by sending an empty - // txn with only the priority set. - if pusherTxn == nil { - pusherTxn = &roachpb.Transaction{ - Priority: roachpb.MakePriority(h.UserPriority), - } - } - var pushReqs []roachpb.Request - for _, intent := range pushIntents { - pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ - Span: roachpb.Span{ - Key: intent.Txn.Key, - }, - PusherTxn: *pusherTxn, - PusheeTxn: intent.Txn, - PushTo: h.Timestamp, - // The timestamp is used by PushTxn for figuring out whether the - // transaction is abandoned. If we used the argument's timestamp - // here, we would run into busy loops because that timestamp - // usually stays fixed among retries, so it will never realize - // that a transaction has timed out. See #877. - Now: now, - PushType: pushType, - }) - } - // TODO(kaneda): Set the transaction in the header so that the - // txn is correctly propagated in an error response. - b := &client.Batch{} - b.InternalAddRequest(pushReqs...) - br, pushErr := s.db.RunWithResponse(b) - if pushErr != nil { - if log.V(1) { - log.Infoc(ctx, "on %s: %s", method, pushErr) - } - - // For write/write conflicts within a transaction, propagate the - // push failure, not the original write intent error. The push - // failure will instruct the client to restart the transaction - // with a backoff. - if pusherTxn.ID != nil && !readOnly { - return nil, pushErr - } - // For read/write conflicts, return the write intent error which - // engages backoff/retry (with !Resolved). We don't need to - // restart the txn, only resend the read with a backoff. - return nil, roachpb.NewError(wiErr) - } - wiErr.Resolved = true // success! - - for i, intent := range pushIntents { - pushee := br.Responses[i].GetInner().(*roachpb.PushTxnResponse).PusheeTxn - intent.Txn = pushee.TxnMeta - intent.Status = pushee.Status - resolveIntents = append(resolveIntents, intent) - } - return resolveIntents, roachpb.NewError(wiErr) -} - // enqueueRaftMessage enqueues a request for eventual processing. It // returns an error to conform to the RPC interface but it always // returns nil without waiting for the message to be processed. diff --git a/util/stop/stopper.go b/util/stop/stopper.go index fd0a7f95b310..203f3cecc952 100644 --- a/util/stop/stopper.go +++ b/util/stop/stopper.go @@ -142,6 +142,43 @@ func (s *Stopper) RunAsyncTask(f func()) bool { return true } +// RunLimitedAsyncTask runs function f in a goroutine, using the given +// channel as a semaphore to limit the number of tasks that are run +// concurrently to the channel's capacity. Blocks until the semaphore +// is available in order to push back on callers that may be trying to +// create many tasks. Returns false if the Stopper is draining and the +// function is not executed. +func (s *Stopper) RunLimitedAsyncTask(sem chan struct{}, f func()) bool { + file, line, _ := caller.Lookup(1) + key := taskKey{file, line} + + // Wait for permission to run from the semaphore. + select { + case sem <- struct{}{}: + case <-s.ShouldDrain(): + return false + default: + log.Printf("stopper throttling task from %s:%d due to semaphore", file, line) + // Retry the select without the default. + select { + case sem <- struct{}{}: + case <-s.ShouldDrain(): + return false + } + } + + if !s.runPrelude(key) { + <-sem + return false + } + go func() { + defer s.runPostlude(key) + defer func() { <-sem }() + f() + }() + return true +} + func (s *Stopper) runPrelude(key taskKey) bool { s.mu.Lock() defer s.mu.Unlock() diff --git a/util/stop/stopper_test.go b/util/stop/stopper_test.go index 346a67b579a1..48b9523cf6d2 100644 --- a/util/stop/stopper_test.go +++ b/util/stop/stopper_test.go @@ -18,6 +18,7 @@ package stop_test import ( "fmt" + "sync" "testing" "time" @@ -374,6 +375,47 @@ func TestStopperShouldDrain(t *testing.T) { <-cleanup } +func TestStopperRunLimitedAsyncTask(t *testing.T) { + defer leaktest.AfterTest(t)() + s := stop.NewStopper() + defer s.Stop() + + const maxConcurrency = 5 + const duration = 10 * time.Millisecond + sem := make(chan struct{}, maxConcurrency) + var mu sync.Mutex + concurrency := 0 + peakConcurrency := 0 + var wg sync.WaitGroup + + f := func() { + mu.Lock() + concurrency++ + if concurrency > peakConcurrency { + peakConcurrency = concurrency + } + mu.Unlock() + time.Sleep(duration) + mu.Lock() + concurrency-- + mu.Unlock() + wg.Done() + } + + for i := 0; i < maxConcurrency*3; i++ { + wg.Add(1) + s.RunLimitedAsyncTask(sem, f) + } + wg.Wait() + if concurrency != 0 { + t.Fatalf("expected 0 concurrency at end of test but got %d", concurrency) + } + if peakConcurrency != maxConcurrency { + t.Fatalf("expected peak concurrency %d to equal max concurrency %d", + peakConcurrency, maxConcurrency) + } +} + func maybePrint() { if testing.Verbose() { // This just needs to be complicated enough not to inline. fmt.Println("blah")