diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 7c2336c45251..d331317d976d 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -283,7 +283,8 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, } } - if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, true /* wait */, false /* !poison */); pErr != nil { + if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, + true /* wait */, false /* !poison */); pErr != nil { return pErr.GoError() } diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 091be0673e2d..252a22b01256 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -44,12 +44,12 @@ const intentResolverTaskLimit = 100 type intentResolver struct { store *Store - sem chan struct{} // Semaphore to limit async goroutines + sem chan struct{} // Semaphore to limit async goroutines. mu struct { sync.Mutex // Maps transaction ids to a refcount. - inFlight map[*uuid.UUID]int + inFlight map[uuid.UUID]int } } @@ -58,7 +58,7 @@ func newIntentResolver(store *Store) *intentResolver { store: store, sem: make(chan struct{}, intentResolverTaskLimit), } - ir.mu.inFlight = map[*uuid.UUID]int{} + ir.mu.inFlight = map[uuid.UUID]int{} return ir } @@ -69,10 +69,13 @@ func newIntentResolver(store *Store) *intentResolver { // // The returned error may be a copy of the original WriteIntentError, // with or without the Resolved flag set, which governs the client's -// retry behavior. (if the transaction is pushed, the Resolved flag is +// retry behavior (if the transaction is pushed, the Resolved flag is // set to tell the client to retry immediately; otherwise it is false // to cause the client to back off). -func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) *roachpb.Error { +func (ir *intentResolver) processWriteIntentError(ctx context.Context, + wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, + pushType roachpb.PushTxnType) *roachpb.Error { + if log.V(6) { log.Infoc(ctx, "resolving write intent %s", wiErr) } @@ -81,7 +84,8 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false) - if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { + if resErr := ir.resolveIntents(ctx, r, resolveIntents, + false /* !wait */, true /* poison */); resErr != nil { // When resolving without waiting, errors should not // usually be returned here, although there are some cases // when they may be (especially when a test cluster is in @@ -127,6 +131,14 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // maybePushTransaction, but if the error is non-nil then some of the // conflicting transactions may still be pending. // +// If skipInFlight is true, then no PushTxns will be sent and no +// intents will be returned for any transaction for which there is +// another push in progress. This should only be used by callers who +// are not relying on the side effect of a push (i.e. only +// pushType==PUSH_TOUCH), and who also don't need to synchronize with +// the resolution of those intents (e.g. asynchronous resolutions of +// intents skipped on inconsistent reads). +// // Callers are involved with // a) conflict resolution for commands being executed at the Store with the // client waiting, @@ -134,7 +146,10 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, + h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ( + []roachpb.Intent, *roachpb.Error) { + now := ir.store.Clock().Now() pusherTxn := h.Txn @@ -161,13 +176,16 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // because the transaction is already finalized. // TODO(bdarnell): can this happen any more? resolveIntents = append(resolveIntents, intent) - } else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight { + } else if _, ok := ir.mu.inFlight[*intent.Txn.ID]; ok && skipInFlight { // Another goroutine is working on this transaction so we can // skip it. + if log.V(1) { + log.Infof("skipping PushTxn for %s; attempt already in flight", intent.Txn.ID) + } continue } else { pushIntents = append(pushIntents, intent) - ir.mu.inFlight[intent.Txn.ID]++ + ir.mu.inFlight[*intent.Txn.ID]++ } } ir.mu.Unlock() @@ -198,9 +216,9 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r br, err := ir.store.db.RunWithResponse(b) ir.mu.Lock() for _, intent := range pushIntents { - ir.mu.inFlight[intent.Txn.ID]-- - if ir.mu.inFlight[intent.Txn.ID] == 0 { - delete(ir.mu.inFlight, intent.Txn.ID) + ir.mu.inFlight[*intent.Txn.ID]-- + if ir.mu.inFlight[*intent.Txn.ID] == 0 { + delete(ir.mu.inFlight, *intent.Txn.ID) } } ir.mu.Unlock() @@ -221,7 +239,7 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // processIntentsAsync asynchronously processes intents which were // encountered during another command but did not interfere with the // execution of that command. This occurs in two cases: inconsistent -// reads and EndTransaction (which queues its own intents for +// reads and EndTransaction (which queues its own external intents for // processing via this method). The two cases are handled somewhat // differently and would be better served by different entry points, // but combining them simplifies the plumbing necessary in Replica. @@ -245,7 +263,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA h := roachpb.Header{Timestamp: now} resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */) - if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } @@ -261,7 +280,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA // For EndTransaction, we know the transaction is finalized so // we can skip the push and go straight to the resolve. - if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } @@ -302,8 +322,9 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA EndKey: r.Desc().EndKey.AsRawKey(), }, } - gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)}) - + gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{ + Key: keys.TransactionKey(txn.Key, txn.ID), + }) ba.Add(&gcArgs) if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { log.Warningf("could not GC completed transaction: %s", pErr) @@ -322,11 +343,14 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA // executed). This ensures that if a waiting client retries // immediately after calling this function, it will not hit the same // intents again. -func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { +func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, + intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { + sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx) defer cleanupSp() - ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces + // We're doing async stuff below; those need new traces. + ctx = opentracing.ContextWithSpan(ctx, nil) sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) var reqsRemote []roachpb.Request diff --git a/storage/replica_test.go b/storage/replica_test.go index e1445dde6d08..a311426c1fcb 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -2286,11 +2286,12 @@ func TestReplicaResolveIntentNoWait(t *testing.T) { setupResolutionTest(t, tc, roachpb.Key("a") /* irrelevant */, splitKey) txn := newTransaction("name", key, 1, roachpb.SERIALIZABLE, tc.clock) txn.Status = roachpb.COMMITTED - if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, []roachpb.Intent{{ - Span: roachpb.Span{Key: key}, - Txn: txn.TxnMeta, - Status: txn.Status, - }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { + if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, + []roachpb.Intent{{ + Span: roachpb.Span{Key: key}, + Txn: txn.TxnMeta, + Status: txn.Status, + }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { t.Fatal(pErr) } util.SucceedsSoon(t, func() error { diff --git a/util/stop/stopper.go b/util/stop/stopper.go index 25c699197ec8..203f3cecc952 100644 --- a/util/stop/stopper.go +++ b/util/stop/stopper.go @@ -151,11 +151,22 @@ func (s *Stopper) RunAsyncTask(f func()) bool { func (s *Stopper) RunLimitedAsyncTask(sem chan struct{}, f func()) bool { file, line, _ := caller.Lookup(1) key := taskKey{file, line} + + // Wait for permission to run from the semaphore. select { case sem <- struct{}{}: case <-s.ShouldDrain(): return false + default: + log.Printf("stopper throttling task from %s:%d due to semaphore", file, line) + // Retry the select without the default. + select { + case sem <- struct{}{}: + case <-s.ShouldDrain(): + return false + } } + if !s.runPrelude(key) { <-sem return false