From 38cafcbda1acb7794314ca00cc6f45d7957d7936 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 18:18:36 -0400 Subject: [PATCH 1/9] storage: Move intent resolution methods to their own file --- storage/intent_resolver.go | 327 +++++++++++++++++++++++++++++++++++++ storage/replica.go | 189 --------------------- storage/store.go | 106 ------------ 3 files changed, 327 insertions(+), 295 deletions(-) create mode 100644 storage/intent_resolver.go diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go new file mode 100644 index 000000000000..8004dd9c44f3 --- /dev/null +++ b/storage/intent_resolver.go @@ -0,0 +1,327 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Ben Darnell + +package storage + +import ( + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/base" + "github.com/cockroachdb/cockroach/client" + "github.com/cockroachdb/cockroach/keys" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/util" + "github.com/cockroachdb/cockroach/util/log" + "github.com/cockroachdb/cockroach/util/tracing" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" +) + +// resolveWriteIntentError tries to push the conflicting transaction (if +// necessary, i.e. if the transaction is pending): either move its timestamp +// forward on a read/write conflict, or abort it on a write/write conflict. If +// the push succeeds (or if it wasn't necessary), the error's Resolved flag is +// set to to true and the caller should call resolveIntents and retry its +// command immediately. If the push fails, the error's Resolved flag is set to +// false so that the client backs off before reissuing the command. On +// write/write conflicts, a potential push error is returned; otherwise the +// updated WriteIntentError is returned. +// +// Callers are involved with +// a) conflict resolution for commands being executed at the Store with the +// client waiting, +// b) resolving intents encountered during inconsistent operations, and +// c) resolving intents upon EndTransaction which are not local to the given +// range. This is the only path in which the transaction is going to be +// in non-pending state and doesn't require a push. +func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { + method := args.Method() + pusherTxn := h.Txn + readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param + args = nil + + if log.V(6) { + log.Infoc(ctx, "resolving write intent %s", wiErr) + } + sp, cleanupSp := tracing.SpanFromContext(opStore, s.Tracer(), ctx) + defer cleanupSp() + sp.LogEvent("intent resolution") + + // Split intents into those we need to push and those which are good to + // resolve. + // TODO(tschottdorf): can optimize this and use same underlying slice. + var pushIntents, resolveIntents []roachpb.Intent + for _, intent := range wiErr.Intents { + // The current intent does not need conflict resolution. + if intent.Status != roachpb.PENDING { + resolveIntents = append(resolveIntents, intent) + } else { + pushIntents = append(pushIntents, intent) + } + } + + // Attempt to push the transaction(s) which created the conflicting intent(s). + now := s.Clock().Now() + + // TODO(tschottdorf): need deduplication here (many pushes for the same + // txn are awkward but even worse, could ratchet up the priority). + // If there's no pusher, we communicate a priority by sending an empty + // txn with only the priority set. + if pusherTxn == nil { + pusherTxn = &roachpb.Transaction{ + Priority: roachpb.MakePriority(h.UserPriority), + } + } + var pushReqs []roachpb.Request + for _, intent := range pushIntents { + pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ + Span: roachpb.Span{ + Key: intent.Txn.Key, + }, + PusherTxn: *pusherTxn, + PusheeTxn: intent.Txn, + PushTo: h.Timestamp, + // The timestamp is used by PushTxn for figuring out whether the + // transaction is abandoned. If we used the argument's timestamp + // here, we would run into busy loops because that timestamp + // usually stays fixed among retries, so it will never realize + // that a transaction has timed out. See #877. + Now: now, + PushType: pushType, + }) + } + // TODO(kaneda): Set the transaction in the header so that the + // txn is correctly propagated in an error response. + b := &client.Batch{} + b.InternalAddRequest(pushReqs...) + br, pushErr := s.db.RunWithResponse(b) + if pushErr != nil { + if log.V(1) { + log.Infoc(ctx, "on %s: %s", method, pushErr) + } + + // For write/write conflicts within a transaction, propagate the + // push failure, not the original write intent error. The push + // failure will instruct the client to restart the transaction + // with a backoff. + if pusherTxn.ID != nil && !readOnly { + return nil, pushErr + } + // For read/write conflicts, return the write intent error which + // engages backoff/retry (with !Resolved). We don't need to + // restart the txn, only resend the read with a backoff. + return nil, roachpb.NewError(wiErr) + } + wiErr.Resolved = true // success! + + for i, intent := range pushIntents { + pushee := br.Responses[i].GetInner().(*roachpb.PushTxnResponse).PusheeTxn + intent.Txn = pushee.TxnMeta + intent.Status = pushee.Status + resolveIntents = append(resolveIntents, intent) + } + return resolveIntents, roachpb.NewError(wiErr) +} + +func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { + if len(intents) == 0 { + return + } + now := r.store.Clock().Now() + ctx := r.context() + stopper := r.store.Stopper() + + for _, item := range intents { + // TODO(tschottdorf): avoid data race related to batch unrolling in ExecuteCmd; + // can probably go again when that provisional code there is gone. Should + // still be careful though, a retry could happen and race with args. + args := util.CloneProto(item.args).(roachpb.Request) + stopper.RunAsyncTask(func() { + // Everything here is best effort; give up rather than waiting + // too long (helps avoid deadlocks during test shutdown, + // although this is imperfect due to the use of an + // uninterruptible WaitGroup.Wait in beginCmds). + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + h := roachpb.Header{Timestamp: now} + resolveIntents, pErr := r.store.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ + Intents: item.intents, + }, r, args, h, roachpb.PUSH_TOUCH) + if wiErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { + log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pErr) + return + } + if pErr := r.resolveIntents(ctxWithTimeout, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) + return + } + // We successfully resolved the intents, so we're able to GC from + // the txn span directly. Note that the sequence cache was cleared + // out synchronously with EndTransaction (see comments within for + // an explanation of why that is kosher). + // + // Note that we poisoned the sequence caches on the external ranges + // above. This may seem counter-intuitive, but it's actually + // necessary: Assume a transaction has committed here, with two + // external intents, and assume that we did not poison. Normally, + // these two intents would be resolved in the same batch, but that + // is not guaranteed (for example, if DistSender has a stale + // descriptor after a Merge). When resolved separately, the first + // ResolveIntent would clear out the sequence cache; an individual + // write on the second (still present) intent could then be + // replayed and would resolve to a real value (at least for a + // window of time unless we delete the local txn entry). That's not + // OK for non-idempotent commands such as Increment. + // TODO(tschottdorf): We should have another side effect on + // MVCCResolveIntent (on commit/abort): If it were able to remove + // the txn from its corresponding entries in the timestamp cache, + // no more replays at the same timestamp would be possible. This + // appears to be a useful performance optimization; we could then + // not poison on EndTransaction. In fact, the above mechanism + // could be an effective alternative to sequence-cache based + // poisoning (or the whole sequence cache?) itself. + // + // TODO(tschottdorf): down the road, can probably unclog the system + // here by batching up a bunch of those GCRequests before proposing. + if args.Method() == roachpb.EndTransaction { + var ba roachpb.BatchRequest + txn := item.intents[0].Txn + gcArgs := roachpb.GCRequest{ + Span: roachpb.Span{ + Key: r.Desc().StartKey.AsRawKey(), + EndKey: r.Desc().EndKey.AsRawKey(), + }, + } + gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)}) + + ba.Add(&gcArgs) + if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { + log.Warningf("could not GC completed transaction: %s", pErr) + } + } + }) + } +} + +// resolveIntents resolves the given intents. For those which are local to the +// range, we submit directly to the range-local Raft instance; all non-local +// intents are resolved asynchronously in a batch. If `wait` is true, all +// operations are carried out synchronously and an error is returned. +// Otherwise, the call returns without error as soon as all local resolve +// commands have been **proposed** (not executed). This ensures that if a +// waiting client retries immediately after calling this function, it will not +// hit the same intents again. +func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { + sp, cleanupSp := tracing.SpanFromContext(opReplica, r.store.Tracer(), ctx) + defer cleanupSp() + + ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces + sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) + + var reqsRemote []roachpb.Request + baLocal := roachpb.BatchRequest{} + for i := range intents { + intent := intents[i] // avoids a race in `i, intent := range ...` + var resolveArgs roachpb.Request + var local bool // whether this intent lives on this Range + { + if len(intent.EndKey) == 0 { + resolveArgs = &roachpb.ResolveIntentRequest{ + Span: intent.Span, + IntentTxn: intent.Txn, + Status: intent.Status, + Poison: poison, + } + local = r.ContainsKey(intent.Key) + } else { + resolveArgs = &roachpb.ResolveIntentRangeRequest{ + Span: intent.Span, + IntentTxn: intent.Txn, + Status: intent.Status, + Poison: poison, + } + local = r.ContainsKeyRange(intent.Key, intent.EndKey) + } + } + + // If the intent isn't (completely) local, we'll need to send an external request. + // We'll batch them all up and send at the end. + if local { + baLocal.Add(resolveArgs) + } else { + reqsRemote = append(reqsRemote, resolveArgs) + } + } + + // The local batch goes directly to Raft. + var wg sync.WaitGroup + if len(baLocal.Requests) > 0 { + action := func() *roachpb.Error { + // Trace this under the ID of the intent owner. + sp := r.store.Tracer().StartSpan("resolve intents") + defer sp.Finish() + ctx = opentracing.ContextWithSpan(ctx, sp) + // Always operate with a timeout when resolving intents: this + // prevents rare shutdown timeouts in tests. + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + _, pErr := r.addWriteCmd(ctxWithTimeout, baLocal, &wg) + return pErr + } + wg.Add(1) + if wait || !r.store.Stopper().RunAsyncTask(func() { + if err := action(); err != nil { + log.Warningf("unable to resolve local intents; %s", err) + } + }) { + // Still run the task when draining. Our caller already has a task and + // going async here again is merely for performance, but some intents + // need to be resolved because they might block other tasks. See #1684. + // Note that handleSkippedIntents has a TODO in case #1684 comes back. + if err := action(); err != nil { + return err + } + } + } + + // Resolve all of the intents which aren't local to the Range. + if len(reqsRemote) > 0 { + b := &client.Batch{} + b.InternalAddRequest(reqsRemote...) + action := func() *roachpb.Error { + // TODO(tschottdorf): no tracing here yet. + return r.store.DB().Run(b) + } + if wait || !r.store.Stopper().RunAsyncTask(func() { + if err := action(); err != nil { + log.Warningf("unable to resolve external intents: %s", err) + } + }) { + // As with local intents, try async to not keep the caller waiting, but + // when draining just go ahead and do it synchronously. See #1684. + if err := action(); err != nil { + return err + } + } + } + + // Wait until the local ResolveIntents batch has been submitted to + // raft. No-op if all were non-local. + wg.Wait() + return nil +} diff --git a/storage/replica.go b/storage/replica.go index db5e7fdf38eb..48f604d33aa9 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -35,7 +35,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" - "github.com/cockroachdb/cockroach/base" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" @@ -1828,86 +1827,6 @@ func (r *Replica) maybeGossipSystemConfig() { r.systemDBHash = hash } -func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { - if len(intents) == 0 { - return - } - now := r.store.Clock().Now() - ctx := r.context() - stopper := r.store.Stopper() - - for _, item := range intents { - // TODO(tschottdorf): avoid data race related to batch unrolling in ExecuteCmd; - // can probably go again when that provisional code there is gone. Should - // still be careful though, a retry could happen and race with args. - args := util.CloneProto(item.args).(roachpb.Request) - stopper.RunAsyncTask(func() { - // Everything here is best effort; give up rather than waiting - // too long (helps avoid deadlocks during test shutdown, - // although this is imperfect due to the use of an - // uninterruptible WaitGroup.Wait in beginCmds). - ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) - defer cancel() - h := roachpb.Header{Timestamp: now} - resolveIntents, pErr := r.store.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ - Intents: item.intents, - }, r, args, h, roachpb.PUSH_TOUCH) - if wiErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { - log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pErr) - return - } - if pErr := r.resolveIntents(ctxWithTimeout, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { - log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) - return - } - // We successfully resolved the intents, so we're able to GC from - // the txn span directly. Note that the sequence cache was cleared - // out synchronously with EndTransaction (see comments within for - // an explanation of why that is kosher). - // - // Note that we poisoned the sequence caches on the external ranges - // above. This may seem counter-intuitive, but it's actually - // necessary: Assume a transaction has committed here, with two - // external intents, and assume that we did not poison. Normally, - // these two intents would be resolved in the same batch, but that - // is not guaranteed (for example, if DistSender has a stale - // descriptor after a Merge). When resolved separately, the first - // ResolveIntent would clear out the sequence cache; an individual - // write on the second (still present) intent could then be - // replayed and would resolve to a real value (at least for a - // window of time unless we delete the local txn entry). That's not - // OK for non-idempotent commands such as Increment. - // TODO(tschottdorf): We should have another side effect on - // MVCCResolveIntent (on commit/abort): If it were able to remove - // the txn from its corresponding entries in the timestamp cache, - // no more replays at the same timestamp would be possible. This - // appears to be a useful performance optimization; we could then - // not poison on EndTransaction. In fact, the above mechanism - // could be an effective alternative to sequence-cache based - // poisoning (or the whole sequence cache?) itself. - // - // TODO(tschottdorf): down the road, can probably unclog the system - // here by batching up a bunch of those GCRequests before proposing. - if args.Method() == roachpb.EndTransaction { - var ba roachpb.BatchRequest - txn := item.intents[0].Txn - gcArgs := roachpb.GCRequest{ - Span: roachpb.Span{ - Key: r.Desc().StartKey.AsRawKey(), - EndKey: r.Desc().EndKey.AsRawKey(), - }, - } - gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)}) - - ba.Add(&gcArgs) - if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { - log.Warningf("could not GC completed transaction: %s", pErr) - } - } - }) - } -} - // newReplicaCorruptionError creates a new error indicating a corrupt replica, // with the supplied list of errors given as history. func newReplicaCorruptionError(errs ...error) *roachpb.ReplicaCorruptionError { @@ -1938,114 +1857,6 @@ func (r *Replica) maybeSetCorrupt(pErr *roachpb.Error) *roachpb.Error { return pErr } -// resolveIntents resolves the given intents. For those which are local to the -// range, we submit directly to the range-local Raft instance; all non-local -// intents are resolved asynchronously in a batch. If `wait` is true, all -// operations are carried out synchronously and an error is returned. -// Otherwise, the call returns without error as soon as all local resolve -// commands have been **proposed** (not executed). This ensures that if a -// waiting client retries immediately after calling this function, it will not -// hit the same intents again. -func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { - sp, cleanupSp := tracing.SpanFromContext(opReplica, r.store.Tracer(), ctx) - defer cleanupSp() - - ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces - sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) - - var reqsRemote []roachpb.Request - baLocal := roachpb.BatchRequest{} - for i := range intents { - intent := intents[i] // avoids a race in `i, intent := range ...` - var resolveArgs roachpb.Request - var local bool // whether this intent lives on this Range - { - if len(intent.EndKey) == 0 { - resolveArgs = &roachpb.ResolveIntentRequest{ - Span: intent.Span, - IntentTxn: intent.Txn, - Status: intent.Status, - Poison: poison, - } - local = r.ContainsKey(intent.Key) - } else { - resolveArgs = &roachpb.ResolveIntentRangeRequest{ - Span: intent.Span, - IntentTxn: intent.Txn, - Status: intent.Status, - Poison: poison, - } - local = r.ContainsKeyRange(intent.Key, intent.EndKey) - } - } - - // If the intent isn't (completely) local, we'll need to send an external request. - // We'll batch them all up and send at the end. - if local { - baLocal.Add(resolveArgs) - } else { - reqsRemote = append(reqsRemote, resolveArgs) - } - } - - // The local batch goes directly to Raft. - var wg sync.WaitGroup - if len(baLocal.Requests) > 0 { - action := func() *roachpb.Error { - // Trace this under the ID of the intent owner. - sp := r.store.Tracer().StartSpan("resolve intents") - defer sp.Finish() - ctx = opentracing.ContextWithSpan(ctx, sp) - // Always operate with a timeout when resolving intents: this - // prevents rare shutdown timeouts in tests. - ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) - defer cancel() - _, pErr := r.addWriteCmd(ctxWithTimeout, baLocal, &wg) - return pErr - } - wg.Add(1) - if wait || !r.store.Stopper().RunAsyncTask(func() { - if err := action(); err != nil { - log.Warningf("unable to resolve local intents; %s", err) - } - }) { - // Still run the task when draining. Our caller already has a task and - // going async here again is merely for performance, but some intents - // need to be resolved because they might block other tasks. See #1684. - // Note that handleSkippedIntents has a TODO in case #1684 comes back. - if err := action(); err != nil { - return err - } - } - } - - // Resolve all of the intents which aren't local to the Range. - if len(reqsRemote) > 0 { - b := &client.Batch{} - b.InternalAddRequest(reqsRemote...) - action := func() *roachpb.Error { - // TODO(tschottdorf): no tracing here yet. - return r.store.DB().Run(b) - } - if wait || !r.store.Stopper().RunAsyncTask(func() { - if err := action(); err != nil { - log.Warningf("unable to resolve external intents: %s", err) - } - }) { - // As with local intents, try async to not keep the caller waiting, but - // when draining just go ahead and do it synchronously. See #1684. - if err := action(); err != nil { - return err - } - } - } - - // Wait until the local ResolveIntents batch has been submitted to - // raft. No-op if all were non-local. - wg.Wait() - return nil -} - var errSystemConfigIntent = errors.New("must retry later due to intent on SystemConfigSpan") // loadSystemConfigSpan scans the entire SystemConfig span and returns the full diff --git a/storage/store.go b/storage/store.go index 9702082fe705..cf07afb8e2be 100644 --- a/storage/store.go +++ b/storage/store.go @@ -1649,112 +1649,6 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. return nil, pErr } -// resolveWriteIntentError tries to push the conflicting transaction (if -// necessary, i.e. if the transaction is pending): either move its timestamp -// forward on a read/write conflict, or abort it on a write/write conflict. If -// the push succeeds (or if it wasn't necessary), the error's Resolved flag is -// set to to true and the caller should call resolveIntents and retry its -// command immediately. If the push fails, the error's Resolved flag is set to -// false so that the client backs off before reissuing the command. On -// write/write conflicts, a potential push error is returned; otherwise the -// updated WriteIntentError is returned. -// -// Callers are involved with -// a) conflict resolution for commands being executed at the Store with the -// client waiting, -// b) resolving intents encountered during inconsistent operations, and -// c) resolving intents upon EndTransaction which are not local to the given -// range. This is the only path in which the transaction is going to be -// in non-pending state and doesn't require a push. -func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { - method := args.Method() - pusherTxn := h.Txn - readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param - args = nil - - if log.V(6) { - log.Infoc(ctx, "resolving write intent %s", wiErr) - } - sp, cleanupSp := tracing.SpanFromContext(opStore, s.Tracer(), ctx) - defer cleanupSp() - sp.LogEvent("intent resolution") - - // Split intents into those we need to push and those which are good to - // resolve. - // TODO(tschottdorf): can optimize this and use same underlying slice. - var pushIntents, resolveIntents []roachpb.Intent - for _, intent := range wiErr.Intents { - // The current intent does not need conflict resolution. - if intent.Status != roachpb.PENDING { - resolveIntents = append(resolveIntents, intent) - } else { - pushIntents = append(pushIntents, intent) - } - } - - // Attempt to push the transaction(s) which created the conflicting intent(s). - now := s.Clock().Now() - - // TODO(tschottdorf): need deduplication here (many pushes for the same - // txn are awkward but even worse, could ratchet up the priority). - // If there's no pusher, we communicate a priority by sending an empty - // txn with only the priority set. - if pusherTxn == nil { - pusherTxn = &roachpb.Transaction{ - Priority: roachpb.MakePriority(h.UserPriority), - } - } - var pushReqs []roachpb.Request - for _, intent := range pushIntents { - pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ - Span: roachpb.Span{ - Key: intent.Txn.Key, - }, - PusherTxn: *pusherTxn, - PusheeTxn: intent.Txn, - PushTo: h.Timestamp, - // The timestamp is used by PushTxn for figuring out whether the - // transaction is abandoned. If we used the argument's timestamp - // here, we would run into busy loops because that timestamp - // usually stays fixed among retries, so it will never realize - // that a transaction has timed out. See #877. - Now: now, - PushType: pushType, - }) - } - // TODO(kaneda): Set the transaction in the header so that the - // txn is correctly propagated in an error response. - b := &client.Batch{} - b.InternalAddRequest(pushReqs...) - br, pushErr := s.db.RunWithResponse(b) - if pushErr != nil { - if log.V(1) { - log.Infoc(ctx, "on %s: %s", method, pushErr) - } - - // For write/write conflicts within a transaction, propagate the - // push failure, not the original write intent error. The push - // failure will instruct the client to restart the transaction - // with a backoff. - if pusherTxn.ID != nil && !readOnly { - return nil, pushErr - } - // For read/write conflicts, return the write intent error which - // engages backoff/retry (with !Resolved). We don't need to - // restart the txn, only resend the read with a backoff. - return nil, roachpb.NewError(wiErr) - } - wiErr.Resolved = true // success! - - for i, intent := range pushIntents { - pushee := br.Responses[i].GetInner().(*roachpb.PushTxnResponse).PusheeTxn - intent.Txn = pushee.TxnMeta - intent.Status = pushee.Status - resolveIntents = append(resolveIntents, intent) - } - return resolveIntents, roachpb.NewError(wiErr) -} - // enqueueRaftMessage enqueues a request for eventual processing. It // returns an error to conform to the RPC interface but it always // returns nil without waiting for the message to be processed. From a159938dbb3cb9c13fb238e96c95529959cd2631 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 18:36:11 -0400 Subject: [PATCH 2/9] storage: move intent resolution methods to a new type --- storage/gc_queue.go | 6 +++--- storage/intent_resolver.go | 28 +++++++++++++++++++--------- storage/replica.go | 6 +++--- storage/replica_test.go | 2 +- storage/store.go | 6 ++++-- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/storage/gc_queue.go b/storage/gc_queue.go index cd75a5600ff1..7c2336c45251 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -283,7 +283,7 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, } } - if pErr := repl.resolveIntents(repl.context(), intents, true /* wait */, false /* !poison */); pErr != nil { + if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, true /* wait */, false /* !poison */); pErr != nil { return pErr.GoError() } @@ -346,14 +346,14 @@ func processTransactionTable(r *Replica, txnMap map[uuid.UUID]*roachpb.Transacti // Note: Most aborted transaction weren't aborted by their client, // but instead by the coordinator - those will not have any intents // persisted, though they still might exist in the system. - if err := r.resolveIntents(r.context(), + if err := r.store.intentResolver.resolveIntents(r.context(), r, roachpb.AsIntents(txn.Intents, &txn), true /* wait */, false /* !poison */); err != nil { log.Warningf("failed to resolve intents of aborted txn on gc: %s", err) } case roachpb.COMMITTED: // It's committed, so it doesn't need a push but we can only // GC it after its intents are resolved. - if err := r.resolveIntents(r.context(), + if err := r.store.intentResolver.resolveIntents(r.context(), r, roachpb.AsIntents(txn.Intents, &txn), true /* wait */, false /* !poison */); err != nil { log.Warningf("unable to resolve intents of committed txn on gc: %s", err) // Returning the error here would abort the whole GC run, and diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 8004dd9c44f3..77b1fa1dd70d 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -32,6 +32,16 @@ import ( "golang.org/x/net/context" ) +// intentResolver manages the process of pushing transactions and +// resolving intents. +type intentResolver struct { + store *Store +} + +func newIntentResolver(store *Store) *intentResolver { + return &intentResolver{store} +} + // resolveWriteIntentError tries to push the conflicting transaction (if // necessary, i.e. if the transaction is pending): either move its timestamp // forward on a read/write conflict, or abort it on a write/write conflict. If @@ -49,7 +59,7 @@ import ( // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { method := args.Method() pusherTxn := h.Txn readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param @@ -58,7 +68,7 @@ func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.Writ if log.V(6) { log.Infoc(ctx, "resolving write intent %s", wiErr) } - sp, cleanupSp := tracing.SpanFromContext(opStore, s.Tracer(), ctx) + sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx) defer cleanupSp() sp.LogEvent("intent resolution") @@ -76,7 +86,7 @@ func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.Writ } // Attempt to push the transaction(s) which created the conflicting intent(s). - now := s.Clock().Now() + now := ir.store.Clock().Now() // TODO(tschottdorf): need deduplication here (many pushes for the same // txn are awkward but even worse, could ratchet up the priority). @@ -109,7 +119,7 @@ func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.Writ // txn is correctly propagated in an error response. b := &client.Batch{} b.InternalAddRequest(pushReqs...) - br, pushErr := s.db.RunWithResponse(b) + br, pushErr := ir.store.db.RunWithResponse(b) if pushErr != nil { if log.V(1) { log.Infoc(ctx, "on %s: %s", method, pushErr) @@ -138,7 +148,7 @@ func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.Writ return resolveIntents, roachpb.NewError(wiErr) } -func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { +func (ir *intentResolver) handleSkippedIntents(r *Replica, intents []intentsWithArg) { if len(intents) == 0 { return } @@ -159,14 +169,14 @@ func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) defer cancel() h := roachpb.Header{Timestamp: now} - resolveIntents, pErr := r.store.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ + resolveIntents, pErr := ir.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ Intents: item.intents, }, r, args, h, roachpb.PUSH_TOUCH) if wiErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pErr) return } - if pErr := r.resolveIntents(ctxWithTimeout, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } @@ -226,8 +236,8 @@ func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { // commands have been **proposed** (not executed). This ensures that if a // waiting client retries immediately after calling this function, it will not // hit the same intents again. -func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { - sp, cleanupSp := tracing.SpanFromContext(opReplica, r.store.Tracer(), ctx) +func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { + sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx) defer cleanupSp() ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces diff --git a/storage/replica.go b/storage/replica.go index 48f604d33aa9..629876724333 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -929,7 +929,7 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) ( // conditions as described in #2231. pErr = r.checkSequenceCache(r.store.Engine(), *ba.Txn) } - r.handleSkippedIntents(intents) + r.store.intentResolver.handleSkippedIntents(r, intents) return br, pErr } @@ -1442,7 +1442,7 @@ func (r *Replica) applyRaftCommand(ctx context.Context, index uint64, originRepl // On the replica on which this command originated, resolve skipped intents // asynchronously - even on failure. if originReplica.StoreID == r.store.StoreID() { - r.handleSkippedIntents(intents) + r.store.intentResolver.handleSkippedIntents(r, intents) } return br, rErr @@ -1875,7 +1875,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { // There were intents, so what we read may not be consistent. Attempt // to nudge the intents in case they're expired; next time around we'll // hopefully have more luck. - r.handleSkippedIntents(intents) + r.store.intentResolver.handleSkippedIntents(r, intents) return nil, nil, errSystemConfigIntent } kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows diff --git a/storage/replica_test.go b/storage/replica_test.go index 324fe78edd9d..e1445dde6d08 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -2286,7 +2286,7 @@ func TestReplicaResolveIntentNoWait(t *testing.T) { setupResolutionTest(t, tc, roachpb.Key("a") /* irrelevant */, splitKey) txn := newTransaction("name", key, 1, roachpb.SERIALIZABLE, tc.clock) txn.Status = roachpb.COMMITTED - if pErr := tc.rng.resolveIntents(context.Background(), []roachpb.Intent{{ + if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, []roachpb.Intent{{ Span: roachpb.Span{Key: key}, Txn: txn.TxnMeta, Status: txn.Status, diff --git a/storage/store.go b/storage/store.go index cf07afb8e2be..b4ba48b0c176 100644 --- a/storage/store.go +++ b/storage/store.go @@ -268,6 +268,7 @@ type Store struct { replicaConsistencyQueue *replicaConsistencyQueue // Replica consistency check queue consistencyScanner *replicaScanner // Consistency checker scanner metrics *storeMetrics + intentResolver *intentResolver wakeRaftLoop chan struct{} started int32 stopper *stop.Stopper @@ -536,6 +537,7 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip raftRequestChan: make(chan *RaftMessageRequest, raftReqBufferSize), metrics: newStoreMetrics(), } + s.intentResolver = newIntentResolver(s) s.mu.Lock() s.mu.replicas = map[roachpb.RangeID]*Replica{} @@ -1587,9 +1589,9 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. // after our operation started. This allows us to not have to // restart for uncertainty as we come back and read. h.Timestamp.Forward(now) - resolveIntents, pErrPush := s.resolveWriteIntentError(ctx, wiErr, rng, args, h, pushType) + resolveIntents, pErrPush := s.intentResolver.resolveWriteIntentError(ctx, wiErr, rng, args, h, pushType) if len(resolveIntents) > 0 { - if resErr := rng.resolveIntents(ctx, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { + if resErr := s.intentResolver.resolveIntents(ctx, rng, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { // When resolving asynchronously, errors should not // usually be returned here, although there are some cases // when they may be (especially when a test cluster is in From e11948892bf6c27d93140c39c93ecdefdacc14a0 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 19:47:06 -0400 Subject: [PATCH 3/9] storage: Minor API cleanups in intentResolver. Rename some methods, clarify comments, and other minor API updates (remove unused arg and avoid mutating the passed-in error). --- storage/intent_resolver.go | 68 +++++++++++++++++++++++--------------- storage/replica.go | 6 ++-- storage/store.go | 2 +- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 77b1fa1dd70d..89163f2c8b69 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -42,15 +42,23 @@ func newIntentResolver(store *Store) *intentResolver { return &intentResolver{store} } -// resolveWriteIntentError tries to push the conflicting transaction (if -// necessary, i.e. if the transaction is pending): either move its timestamp -// forward on a read/write conflict, or abort it on a write/write conflict. If -// the push succeeds (or if it wasn't necessary), the error's Resolved flag is -// set to to true and the caller should call resolveIntents and retry its -// command immediately. If the push fails, the error's Resolved flag is set to -// false so that the client backs off before reissuing the command. On -// write/write conflicts, a potential push error is returned; otherwise the -// updated WriteIntentError is returned. +// maybePushTransaction tries to push the conflicting transaction(s) +// responsible for the given WriteIntentError: either move its +// timestamp forward on a read/write conflict, abort it on a +// write/write conflict, or do nothing if the transaction is no longer +// pending. +// +// Returns a slice of intents which can now be resolved and an error +// which should be returned to the client in place of the original +// WriteIntentError. The returned intents should be resolved via +// intentResolver.resolveIntents regardless of any error returned by +// maybePushTransaction. +// +// The returned error may be a copy of the original WriteIntentError, +// with or without the Resolved flag set, which governs the client's +// retry behavior. (if the transaction is pushed, the Resolved flag is +// set to tell the client to retry immediately; otherwise it is false +// to cause the client to back off). // // Callers are involved with // a) conflict resolution for commands being executed at the Store with the @@ -59,7 +67,7 @@ func newIntentResolver(store *Store) *intentResolver { // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) resolveWriteIntentError(ctx context.Context, wiErr *roachpb.WriteIntentError, rng *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) maybePushTransactions(ctx context.Context, wiErr roachpb.WriteIntentError, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { method := args.Method() pusherTxn := h.Txn readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param @@ -135,7 +143,7 @@ func (ir *intentResolver) resolveWriteIntentError(ctx context.Context, wiErr *ro // For read/write conflicts, return the write intent error which // engages backoff/retry (with !Resolved). We don't need to // restart the txn, only resend the read with a backoff. - return nil, roachpb.NewError(wiErr) + return nil, roachpb.NewError(&wiErr) } wiErr.Resolved = true // success! @@ -145,10 +153,15 @@ func (ir *intentResolver) resolveWriteIntentError(ctx context.Context, wiErr *ro intent.Status = pushee.Status resolveIntents = append(resolveIntents, intent) } - return resolveIntents, roachpb.NewError(wiErr) + return resolveIntents, roachpb.NewError(&wiErr) } -func (ir *intentResolver) handleSkippedIntents(r *Replica, intents []intentsWithArg) { +// processIntentsAsync asynchronously processes intents which were +// encountered during another command but did not interfere with the +// execution of that command. This occurs in two cases: inconsistent +// reads and EndTransaction (which queues its own intents for +// processing via this method). +func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithArg) { if len(intents) == 0 { return } @@ -169,17 +182,17 @@ func (ir *intentResolver) handleSkippedIntents(r *Replica, intents []intentsWith ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) defer cancel() h := roachpb.Header{Timestamp: now} - resolveIntents, pErr := ir.resolveWriteIntentError(ctxWithTimeout, &roachpb.WriteIntentError{ + resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, roachpb.WriteIntentError{ Intents: item.intents, - }, r, args, h, roachpb.PUSH_TOUCH) - if wiErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { - log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pErr) - return - } + }, args, h, roachpb.PUSH_TOUCH) if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } + if wiErr, ok := pushErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { + log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pushErr) + return + } // We successfully resolved the intents, so we're able to GC from // the txn span directly. Note that the sequence cache was cleared // out synchronously with EndTransaction (see comments within for @@ -228,14 +241,15 @@ func (ir *intentResolver) handleSkippedIntents(r *Replica, intents []intentsWith } } -// resolveIntents resolves the given intents. For those which are local to the -// range, we submit directly to the range-local Raft instance; all non-local -// intents are resolved asynchronously in a batch. If `wait` is true, all -// operations are carried out synchronously and an error is returned. -// Otherwise, the call returns without error as soon as all local resolve -// commands have been **proposed** (not executed). This ensures that if a -// waiting client retries immediately after calling this function, it will not -// hit the same intents again. +// resolveIntents resolves the given intents. For those which are +// local to the range, we submit directly to the local Raft instance; +// all non-local intents are resolved asynchronously in a batch. If +// `wait` is true, all operations are carried out synchronously and an +// error is returned. Otherwise, the call returns without error as +// soon as all local resolve commands have been **proposed** (not +// executed). This ensures that if a waiting client retries +// immediately after calling this function, it will not hit the same +// intents again. func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx) defer cleanupSp() diff --git a/storage/replica.go b/storage/replica.go index 629876724333..da1798fc2516 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -929,7 +929,7 @@ func (r *Replica) addReadOnlyCmd(ctx context.Context, ba roachpb.BatchRequest) ( // conditions as described in #2231. pErr = r.checkSequenceCache(r.store.Engine(), *ba.Txn) } - r.store.intentResolver.handleSkippedIntents(r, intents) + r.store.intentResolver.processIntentsAsync(r, intents) return br, pErr } @@ -1442,7 +1442,7 @@ func (r *Replica) applyRaftCommand(ctx context.Context, index uint64, originRepl // On the replica on which this command originated, resolve skipped intents // asynchronously - even on failure. if originReplica.StoreID == r.store.StoreID() { - r.store.intentResolver.handleSkippedIntents(r, intents) + r.store.intentResolver.processIntentsAsync(r, intents) } return br, rErr @@ -1875,7 +1875,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { // There were intents, so what we read may not be consistent. Attempt // to nudge the intents in case they're expired; next time around we'll // hopefully have more luck. - r.store.intentResolver.handleSkippedIntents(r, intents) + r.store.intentResolver.processIntentsAsync(r, intents) return nil, nil, errSystemConfigIntent } kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows diff --git a/storage/store.go b/storage/store.go index b4ba48b0c176..9326491d26ef 100644 --- a/storage/store.go +++ b/storage/store.go @@ -1589,7 +1589,7 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. // after our operation started. This allows us to not have to // restart for uncertainty as we come back and read. h.Timestamp.Forward(now) - resolveIntents, pErrPush := s.intentResolver.resolveWriteIntentError(ctx, wiErr, rng, args, h, pushType) + resolveIntents, pErrPush := s.intentResolver.maybePushTransactions(ctx, *wiErr, args, h, pushType) if len(resolveIntents) > 0 { if resErr := s.intentResolver.resolveIntents(ctx, rng, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { // When resolving asynchronously, errors should not From 2f16ee122a7b65c3ec530d9dc7894f06ab70e929 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 22:19:41 -0400 Subject: [PATCH 4/9] storage: Separate intent push from WriteIntentError manipulation --- storage/intent_resolver.go | 110 +++++++++++++++++++++++-------------- storage/store.go | 11 +--- 2 files changed, 69 insertions(+), 52 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 89163f2c8b69..d705b07f3c98 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -42,23 +42,70 @@ func newIntentResolver(store *Store) *intentResolver { return &intentResolver{store} } -// maybePushTransaction tries to push the conflicting transaction(s) -// responsible for the given WriteIntentError: either move its -// timestamp forward on a read/write conflict, abort it on a -// write/write conflict, or do nothing if the transaction is no longer -// pending. -// -// Returns a slice of intents which can now be resolved and an error -// which should be returned to the client in place of the original -// WriteIntentError. The returned intents should be resolved via -// intentResolver.resolveIntents regardless of any error returned by -// maybePushTransaction. +// processWriteIntentError tries to push the conflicting +// transaction(s) responsible for the given WriteIntentError, and to +// resolve those intents if possible. Returns a new error to be used +// in place of the original. // // The returned error may be a copy of the original WriteIntentError, // with or without the Resolved flag set, which governs the client's // retry behavior. (if the transaction is pushed, the Resolved flag is // set to tell the client to retry immediately; otherwise it is false // to cause the client to back off). +func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) *roachpb.Error { + if log.V(6) { + log.Infoc(ctx, "resolving write intent %s", wiErr) + } + + method := args.Method() + readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param + + resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType) + if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { + // When resolving without waiting, errors should not + // usually be returned here, although there are some cases + // when they may be (especially when a test cluster is in + // the process of shutting down). + log.Warningf("asynchronous resolveIntents failed: %s", resErr) + } + + if pushErr != nil { + if log.V(1) { + log.Infoc(ctx, "on %s: %s", method, pushErr) + } + + // For write/write conflicts within a transaction, propagate the + // push failure, not the original write intent error. The push + // failure will instruct the client to restart the transaction + // with a backoff. + if h.Txn != nil && h.Txn.ID != nil && !readOnly { + return pushErr + } + + // For read/write conflicts, and non-transactional write/write + // conflicts, return the write intent error which engages + // backoff/retry (with !Resolved). We don't need to restart the + // txn, only resend the read with a backoff. + return roachpb.NewError(&wiErr) + } + + // We pushed all transactions, so tell the client everything's + // resolved and it can retry immediately. + wiErr.Resolved = true + return roachpb.NewError(&wiErr) +} + +// maybePushTransaction tries to push the conflicting transaction(s) +// responsible for the given intents: either move its +// timestamp forward on a read/write conflict, abort it on a +// write/write conflict, or do nothing if the transaction is no longer +// pending. +// +// Returns a slice of intents which can now be resolved, and an error. +// The returned intents should be resolved via +// intentResolver.resolveIntents regardless of any error returned by +// maybePushTransaction, but if the error is non-nil then some of the +// conflicting transactions may still be pending. // // Callers are involved with // a) conflict resolution for commands being executed at the Store with the @@ -67,15 +114,9 @@ func newIntentResolver(store *Store) *intentResolver { // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) maybePushTransactions(ctx context.Context, wiErr roachpb.WriteIntentError, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { - method := args.Method() +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { pusherTxn := h.Txn - readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param - args = nil - if log.V(6) { - log.Infoc(ctx, "resolving write intent %s", wiErr) - } sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx) defer cleanupSp() sp.LogEvent("intent resolution") @@ -84,7 +125,7 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, wiErr roach // resolve. // TODO(tschottdorf): can optimize this and use same underlying slice. var pushIntents, resolveIntents []roachpb.Intent - for _, intent := range wiErr.Intents { + for _, intent := range intents { // The current intent does not need conflict resolution. if intent.Status != roachpb.PENDING { resolveIntents = append(resolveIntents, intent) @@ -127,25 +168,11 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, wiErr roach // txn is correctly propagated in an error response. b := &client.Batch{} b.InternalAddRequest(pushReqs...) - br, pushErr := ir.store.db.RunWithResponse(b) - if pushErr != nil { - if log.V(1) { - log.Infoc(ctx, "on %s: %s", method, pushErr) - } - - // For write/write conflicts within a transaction, propagate the - // push failure, not the original write intent error. The push - // failure will instruct the client to restart the transaction - // with a backoff. - if pusherTxn.ID != nil && !readOnly { - return nil, pushErr - } - // For read/write conflicts, return the write intent error which - // engages backoff/retry (with !Resolved). We don't need to - // restart the txn, only resend the read with a backoff. - return nil, roachpb.NewError(&wiErr) + br, err := ir.store.db.RunWithResponse(b) + if err != nil { + // TODO(bdarnell): return resolveIntents even on error. + return nil, err } - wiErr.Resolved = true // success! for i, intent := range pushIntents { pushee := br.Responses[i].GetInner().(*roachpb.PushTxnResponse).PusheeTxn @@ -153,7 +180,7 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, wiErr roach intent.Status = pushee.Status resolveIntents = append(resolveIntents, intent) } - return resolveIntents, roachpb.NewError(&wiErr) + return resolveIntents, nil } // processIntentsAsync asynchronously processes intents which were @@ -182,14 +209,13 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) defer cancel() h := roachpb.Header{Timestamp: now} - resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, roachpb.WriteIntentError{ - Intents: item.intents, - }, args, h, roachpb.PUSH_TOUCH) + resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, + item.intents, h, roachpb.PUSH_TOUCH) if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } - if wiErr, ok := pushErr.GetDetail().(*roachpb.WriteIntentError); !ok || wiErr == nil || !wiErr.Resolved { + if pushErr != nil { log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pushErr) return } diff --git a/storage/store.go b/storage/store.go index 9326491d26ef..8c4614153e23 100644 --- a/storage/store.go +++ b/storage/store.go @@ -1589,16 +1589,7 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb. // after our operation started. This allows us to not have to // restart for uncertainty as we come back and read. h.Timestamp.Forward(now) - resolveIntents, pErrPush := s.intentResolver.maybePushTransactions(ctx, *wiErr, args, h, pushType) - if len(resolveIntents) > 0 { - if resErr := s.intentResolver.resolveIntents(ctx, rng, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { - // When resolving asynchronously, errors should not - // usually be returned here, although there are some cases - // when they may be (especially when a test cluster is in - // the process of shutting down). - log.Warningf("asynchronous resolveIntents failed: %s", resErr) - } - } + pErrPush := s.intentResolver.processWriteIntentError(ctx, *wiErr, rng, args, h, pushType) // Preserve the error index. oldIndex := pErr.Index pErr = pErrPush From 76f0dd645c00deb35c266ef0e5c8a529d698f32d Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 22:34:28 -0400 Subject: [PATCH 5/9] storage: split EndTransaction from other uses of processIntentsAsync --- storage/intent_resolver.go | 120 ++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 55 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index d705b07f3c98..0f372062fb52 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" - "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/tracing" "github.com/opentracing/opentracing-go" @@ -187,7 +186,9 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // encountered during another command but did not interfere with the // execution of that command. This occurs in two cases: inconsistent // reads and EndTransaction (which queues its own intents for -// processing via this method). +// processing via this method). The two cases are handled somewhat +// differently and would be better served by different entry points, +// but combining them simplifies the plumbing necessary in Replica. func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithArg) { if len(intents) == 0 { return @@ -197,57 +198,66 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA stopper := r.store.Stopper() for _, item := range intents { - // TODO(tschottdorf): avoid data race related to batch unrolling in ExecuteCmd; - // can probably go again when that provisional code there is gone. Should - // still be careful though, a retry could happen and race with args. - args := util.CloneProto(item.args).(roachpb.Request) - stopper.RunAsyncTask(func() { - // Everything here is best effort; give up rather than waiting - // too long (helps avoid deadlocks during test shutdown, - // although this is imperfect due to the use of an - // uninterruptible WaitGroup.Wait in beginCmds). - ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) - defer cancel() - h := roachpb.Header{Timestamp: now} - resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, - item.intents, h, roachpb.PUSH_TOUCH) - if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { - log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) - return - } - if pushErr != nil { - log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pushErr) - return - } - // We successfully resolved the intents, so we're able to GC from - // the txn span directly. Note that the sequence cache was cleared - // out synchronously with EndTransaction (see comments within for - // an explanation of why that is kosher). - // - // Note that we poisoned the sequence caches on the external ranges - // above. This may seem counter-intuitive, but it's actually - // necessary: Assume a transaction has committed here, with two - // external intents, and assume that we did not poison. Normally, - // these two intents would be resolved in the same batch, but that - // is not guaranteed (for example, if DistSender has a stale - // descriptor after a Merge). When resolved separately, the first - // ResolveIntent would clear out the sequence cache; an individual - // write on the second (still present) intent could then be - // replayed and would resolve to a real value (at least for a - // window of time unless we delete the local txn entry). That's not - // OK for non-idempotent commands such as Increment. - // TODO(tschottdorf): We should have another side effect on - // MVCCResolveIntent (on commit/abort): If it were able to remove - // the txn from its corresponding entries in the timestamp cache, - // no more replays at the same timestamp would be possible. This - // appears to be a useful performance optimization; we could then - // not poison on EndTransaction. In fact, the above mechanism - // could be an effective alternative to sequence-cache based - // poisoning (or the whole sequence cache?) itself. - // - // TODO(tschottdorf): down the road, can probably unclog the system - // here by batching up a bunch of those GCRequests before proposing. - if args.Method() == roachpb.EndTransaction { + if item.args.Method() != roachpb.EndTransaction { + stopper.RunAsyncTask(func() { + // Everything here is best effort; give up rather than waiting + // too long (helps avoid deadlocks during test shutdown, + // although this is imperfect due to the use of an + // uninterruptible WaitGroup.Wait in beginCmds). + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + h := roachpb.Header{Timestamp: now} + resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, + item.intents, h, roachpb.PUSH_TOUCH) + if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) + return + } + if pushErr != nil { + log.Warningc(ctxWithTimeout, "failed to push during intent resolution: %s", pushErr) + return + } + }) + } else { // EndTransaction + stopper.RunAsyncTask(func() { + ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) + defer cancel() + + // For EndTransaction, we know the transaction is finalized so + // we can skip the push and go straight to the resolve. + if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) + return + } + + // We successfully resolved the intents, so we're able to GC from + // the txn span directly. Note that the sequence cache was cleared + // out synchronously with EndTransaction (see comments within for + // an explanation of why that is kosher). + // + // Note that we poisoned the sequence caches on the external ranges + // above. This may seem counter-intuitive, but it's actually + // necessary: Assume a transaction has committed here, with two + // external intents, and assume that we did not poison. Normally, + // these two intents would be resolved in the same batch, but that + // is not guaranteed (for example, if DistSender has a stale + // descriptor after a Merge). When resolved separately, the first + // ResolveIntent would clear out the sequence cache; an individual + // write on the second (still present) intent could then be + // replayed and would resolve to a real value (at least for a + // window of time unless we delete the local txn entry). That's not + // OK for non-idempotent commands such as Increment. + // TODO(tschottdorf): We should have another side effect on + // MVCCResolveIntent (on commit/abort): If it were able to remove + // the txn from its corresponding entries in the timestamp cache, + // no more replays at the same timestamp would be possible. This + // appears to be a useful performance optimization; we could then + // not poison on EndTransaction. In fact, the above mechanism + // could be an effective alternative to sequence-cache based + // poisoning (or the whole sequence cache?) itself. + // + // TODO(tschottdorf): down the road, can probably unclog the system + // here by batching up a bunch of those GCRequests before proposing. var ba roachpb.BatchRequest txn := item.intents[0].Txn gcArgs := roachpb.GCRequest{ @@ -262,8 +272,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { log.Warningf("could not GC completed transaction: %s", pErr) } - } - }) + }) + } } } From d1655b0b8fbf0ca32488ecd94e97668dbd011da4 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 23:18:34 -0400 Subject: [PATCH 6/9] storage: Dedupe PushTxn calls from async intent resolution An explosion of these calls for the same transaction (or a small number of transactions) was responsible for #4925 Fixes #4925 --- storage/intent_resolver.go | 58 +++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 0f372062fb52..5edcbdf0af32 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/tracing" + "github.com/cockroachdb/cockroach/util/uuid" "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -35,10 +36,20 @@ import ( // resolving intents. type intentResolver struct { store *Store + + mu struct { + sync.Mutex + // Maps transaction ids to a refcount. + inFlight map[*uuid.UUID]int + } } func newIntentResolver(store *Store) *intentResolver { - return &intentResolver{store} + ir := &intentResolver{ + store: store, + } + ir.mu.inFlight = map[*uuid.UUID]int{} + return ir } // processWriteIntentError tries to push the conflicting @@ -59,7 +70,7 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa method := args.Method() readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param - resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType) + resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false) if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { // When resolving without waiting, errors should not // usually be returned here, although there are some cases @@ -113,8 +124,17 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) { + now := ir.store.Clock().Now() + pusherTxn := h.Txn + // If there's no pusher, we communicate a priority by sending an empty + // txn with only the priority set. + if pusherTxn == nil { + pusherTxn = &roachpb.Transaction{ + Priority: roachpb.MakePriority(h.UserPriority), + } + } sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx) defer cleanupSp() @@ -122,29 +142,27 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // Split intents into those we need to push and those which are good to // resolve. + ir.mu.Lock() // TODO(tschottdorf): can optimize this and use same underlying slice. var pushIntents, resolveIntents []roachpb.Intent for _, intent := range intents { - // The current intent does not need conflict resolution. if intent.Status != roachpb.PENDING { + // The current intent does not need conflict resolution + // because the transaction is already finalized. + // TODO(bdarnell): can this happen any more? resolveIntents = append(resolveIntents, intent) + } else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight { + // Another goroutine is working on this transaction so we can + // skip it. + continue } else { pushIntents = append(pushIntents, intent) + ir.mu.inFlight[intent.Txn.ID]++ } } + ir.mu.Unlock() // Attempt to push the transaction(s) which created the conflicting intent(s). - now := ir.store.Clock().Now() - - // TODO(tschottdorf): need deduplication here (many pushes for the same - // txn are awkward but even worse, could ratchet up the priority). - // If there's no pusher, we communicate a priority by sending an empty - // txn with only the priority set. - if pusherTxn == nil { - pusherTxn = &roachpb.Transaction{ - Priority: roachpb.MakePriority(h.UserPriority), - } - } var pushReqs []roachpb.Request for _, intent := range pushIntents { pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ @@ -168,6 +186,14 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r b := &client.Batch{} b.InternalAddRequest(pushReqs...) br, err := ir.store.db.RunWithResponse(b) + ir.mu.Lock() + for _, intent := range pushIntents { + ir.mu.inFlight[intent.Txn.ID]-- + if ir.mu.inFlight[intent.Txn.ID] == 0 { + delete(ir.mu.inFlight, intent.Txn.ID) + } + } + ir.mu.Unlock() if err != nil { // TODO(bdarnell): return resolveIntents even on error. return nil, err @@ -208,7 +234,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA defer cancel() h := roachpb.Header{Timestamp: now} resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, - item.intents, h, roachpb.PUSH_TOUCH) + item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */) if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return From 441e602393d47cebdb9b2ef608838935c8ed934d Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 23:36:02 -0400 Subject: [PATCH 7/9] util/stop: Introduce RunLimitedAsyncTask This method adds channel-based backpressure to RunAsyncTask, to avoid spinning up an unbounded number of goroutines. --- util/stop/stopper.go | 26 ++++++++++++++++++++++++ util/stop/stopper_test.go | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/util/stop/stopper.go b/util/stop/stopper.go index fd0a7f95b310..25c699197ec8 100644 --- a/util/stop/stopper.go +++ b/util/stop/stopper.go @@ -142,6 +142,32 @@ func (s *Stopper) RunAsyncTask(f func()) bool { return true } +// RunLimitedAsyncTask runs function f in a goroutine, using the given +// channel as a semaphore to limit the number of tasks that are run +// concurrently to the channel's capacity. Blocks until the semaphore +// is available in order to push back on callers that may be trying to +// create many tasks. Returns false if the Stopper is draining and the +// function is not executed. +func (s *Stopper) RunLimitedAsyncTask(sem chan struct{}, f func()) bool { + file, line, _ := caller.Lookup(1) + key := taskKey{file, line} + select { + case sem <- struct{}{}: + case <-s.ShouldDrain(): + return false + } + if !s.runPrelude(key) { + <-sem + return false + } + go func() { + defer s.runPostlude(key) + defer func() { <-sem }() + f() + }() + return true +} + func (s *Stopper) runPrelude(key taskKey) bool { s.mu.Lock() defer s.mu.Unlock() diff --git a/util/stop/stopper_test.go b/util/stop/stopper_test.go index 346a67b579a1..48b9523cf6d2 100644 --- a/util/stop/stopper_test.go +++ b/util/stop/stopper_test.go @@ -18,6 +18,7 @@ package stop_test import ( "fmt" + "sync" "testing" "time" @@ -374,6 +375,47 @@ func TestStopperShouldDrain(t *testing.T) { <-cleanup } +func TestStopperRunLimitedAsyncTask(t *testing.T) { + defer leaktest.AfterTest(t)() + s := stop.NewStopper() + defer s.Stop() + + const maxConcurrency = 5 + const duration = 10 * time.Millisecond + sem := make(chan struct{}, maxConcurrency) + var mu sync.Mutex + concurrency := 0 + peakConcurrency := 0 + var wg sync.WaitGroup + + f := func() { + mu.Lock() + concurrency++ + if concurrency > peakConcurrency { + peakConcurrency = concurrency + } + mu.Unlock() + time.Sleep(duration) + mu.Lock() + concurrency-- + mu.Unlock() + wg.Done() + } + + for i := 0; i < maxConcurrency*3; i++ { + wg.Add(1) + s.RunLimitedAsyncTask(sem, f) + } + wg.Wait() + if concurrency != 0 { + t.Fatalf("expected 0 concurrency at end of test but got %d", concurrency) + } + if peakConcurrency != maxConcurrency { + t.Fatalf("expected peak concurrency %d to equal max concurrency %d", + peakConcurrency, maxConcurrency) + } +} + func maybePrint() { if testing.Verbose() { // This just needs to be complicated enough not to inline. fmt.Println("blah") From 8e44cdc0a218ffddf751d540e528cc54136ef990 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 23:44:43 -0400 Subject: [PATCH 8/9] storage: Limit the number of tasks started by intentResolver This is a last line of defense against issues like #4925. --- storage/intent_resolver.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 5edcbdf0af32..091be0673e2d 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -32,11 +32,20 @@ import ( "golang.org/x/net/context" ) +// intentResolverTaskLimit is the maximum number of asynchronous tasks +// that may be started by intentResolver. When this limit is reached +// asynchronous tasks will start to block to apply backpressure. +// This is a last line of defense against issues like #4925. +// TODO(bdarnell): how to determine best value? +const intentResolverTaskLimit = 100 + // intentResolver manages the process of pushing transactions and // resolving intents. type intentResolver struct { store *Store + sem chan struct{} // Semaphore to limit async goroutines + mu struct { sync.Mutex // Maps transaction ids to a refcount. @@ -47,6 +56,7 @@ type intentResolver struct { func newIntentResolver(store *Store) *intentResolver { ir := &intentResolver{ store: store, + sem: make(chan struct{}, intentResolverTaskLimit), } ir.mu.inFlight = map[*uuid.UUID]int{} return ir @@ -225,7 +235,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA for _, item := range intents { if item.args.Method() != roachpb.EndTransaction { - stopper.RunAsyncTask(func() { + stopper.RunLimitedAsyncTask(ir.sem, func() { // Everything here is best effort; give up rather than waiting // too long (helps avoid deadlocks during test shutdown, // although this is imperfect due to the use of an @@ -245,7 +255,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA } }) } else { // EndTransaction - stopper.RunAsyncTask(func() { + stopper.RunLimitedAsyncTask(ir.sem, func() { ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) defer cancel() @@ -370,7 +380,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent return pErr } wg.Add(1) - if wait || !r.store.Stopper().RunAsyncTask(func() { + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { if err := action(); err != nil { log.Warningf("unable to resolve local intents; %s", err) } @@ -393,7 +403,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent // TODO(tschottdorf): no tracing here yet. return r.store.DB().Run(b) } - if wait || !r.store.Stopper().RunAsyncTask(func() { + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { if err := action(); err != nil { log.Warningf("unable to resolve external intents: %s", err) } From bae254cee69bafaa3d03b7c832cd70b9a2733a51 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 14 Mar 2016 16:06:47 -0400 Subject: [PATCH 9/9] storage: PR feedback Change map keys from pointers to values. Add logging. Wrap long lines and update comments. --- storage/gc_queue.go | 3 +- storage/intent_resolver.go | 62 ++++++++++++++++++++++++++------------ storage/replica_test.go | 11 ++++--- util/stop/stopper.go | 11 +++++++ 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 7c2336c45251..d331317d976d 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -283,7 +283,8 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, } } - if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, true /* wait */, false /* !poison */); pErr != nil { + if pErr := repl.store.intentResolver.resolveIntents(repl.context(), repl, intents, + true /* wait */, false /* !poison */); pErr != nil { return pErr.GoError() } diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 091be0673e2d..93b5b3a8cc60 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -44,12 +44,12 @@ const intentResolverTaskLimit = 100 type intentResolver struct { store *Store - sem chan struct{} // Semaphore to limit async goroutines + sem chan struct{} // Semaphore to limit async goroutines. mu struct { sync.Mutex // Maps transaction ids to a refcount. - inFlight map[*uuid.UUID]int + inFlight map[uuid.UUID]int } } @@ -58,7 +58,7 @@ func newIntentResolver(store *Store) *intentResolver { store: store, sem: make(chan struct{}, intentResolverTaskLimit), } - ir.mu.inFlight = map[*uuid.UUID]int{} + ir.mu.inFlight = map[uuid.UUID]int{} return ir } @@ -69,10 +69,13 @@ func newIntentResolver(store *Store) *intentResolver { // // The returned error may be a copy of the original WriteIntentError, // with or without the Resolved flag set, which governs the client's -// retry behavior. (if the transaction is pushed, the Resolved flag is +// retry behavior (if the transaction is pushed, the Resolved flag is // set to tell the client to retry immediately; otherwise it is false // to cause the client to back off). -func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType) *roachpb.Error { +func (ir *intentResolver) processWriteIntentError(ctx context.Context, + wiErr roachpb.WriteIntentError, r *Replica, args roachpb.Request, h roachpb.Header, + pushType roachpb.PushTxnType) *roachpb.Error { + if log.V(6) { log.Infoc(ctx, "resolving write intent %s", wiErr) } @@ -81,7 +84,8 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false) - if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { + if resErr := ir.resolveIntents(ctx, r, resolveIntents, + false /* !wait */, true /* poison */); resErr != nil { // When resolving without waiting, errors should not // usually be returned here, although there are some cases // when they may be (especially when a test cluster is in @@ -127,6 +131,14 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // maybePushTransaction, but if the error is non-nil then some of the // conflicting transactions may still be pending. // +// If skipIfInFlight is true, then no PushTxns will be sent and no +// intents will be returned for any transaction for which there is +// another push in progress. This should only be used by callers who +// are not relying on the side effect of a push (i.e. only +// pushType==PUSH_TOUCH), and who also don't need to synchronize with +// the resolution of those intents (e.g. asynchronous resolutions of +// intents skipped on inconsistent reads). +// // Callers are involved with // a) conflict resolution for commands being executed at the Store with the // client waiting, @@ -134,7 +146,10 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, + h roachpb.Header, pushType roachpb.PushTxnType, skipIfInFlight bool) ( + []roachpb.Intent, *roachpb.Error) { + now := ir.store.Clock().Now() pusherTxn := h.Txn @@ -161,13 +176,16 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // because the transaction is already finalized. // TODO(bdarnell): can this happen any more? resolveIntents = append(resolveIntents, intent) - } else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight { + } else if _, ok := ir.mu.inFlight[*intent.Txn.ID]; ok && skipIfInFlight { // Another goroutine is working on this transaction so we can // skip it. + if log.V(1) { + log.Infof("skipping PushTxn for %s; attempt already in flight", intent.Txn.ID) + } continue } else { pushIntents = append(pushIntents, intent) - ir.mu.inFlight[intent.Txn.ID]++ + ir.mu.inFlight[*intent.Txn.ID]++ } } ir.mu.Unlock() @@ -198,9 +216,9 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r br, err := ir.store.db.RunWithResponse(b) ir.mu.Lock() for _, intent := range pushIntents { - ir.mu.inFlight[intent.Txn.ID]-- - if ir.mu.inFlight[intent.Txn.ID] == 0 { - delete(ir.mu.inFlight, intent.Txn.ID) + ir.mu.inFlight[*intent.Txn.ID]-- + if ir.mu.inFlight[*intent.Txn.ID] == 0 { + delete(ir.mu.inFlight, *intent.Txn.ID) } } ir.mu.Unlock() @@ -221,7 +239,7 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // processIntentsAsync asynchronously processes intents which were // encountered during another command but did not interfere with the // execution of that command. This occurs in two cases: inconsistent -// reads and EndTransaction (which queues its own intents for +// reads and EndTransaction (which queues its own external intents for // processing via this method). The two cases are handled somewhat // differently and would be better served by different entry points, // but combining them simplifies the plumbing necessary in Replica. @@ -245,7 +263,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA h := roachpb.Header{Timestamp: now} resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */) - if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } @@ -261,7 +280,8 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA // For EndTransaction, we know the transaction is finalized so // we can skip the push and go straight to the resolve. - if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { + if pErr := ir.resolveIntents(ctxWithTimeout, r, item.intents, + true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return } @@ -302,8 +322,9 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA EndKey: r.Desc().EndKey.AsRawKey(), }, } - gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{Key: keys.TransactionKey(txn.Key, txn.ID)}) - + gcArgs.Keys = append(gcArgs.Keys, roachpb.GCRequest_GCKey{ + Key: keys.TransactionKey(txn.Key, txn.ID), + }) ba.Add(&gcArgs) if _, pErr := r.addWriteCmd(ctxWithTimeout, ba, nil /* nil */); pErr != nil { log.Warningf("could not GC completed transaction: %s", pErr) @@ -322,11 +343,14 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA // executed). This ensures that if a waiting client retries // immediately after calling this function, it will not hit the same // intents again. -func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { +func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, + intents []roachpb.Intent, wait bool, poison bool) *roachpb.Error { + sp, cleanupSp := tracing.SpanFromContext(opReplica, ir.store.Tracer(), ctx) defer cleanupSp() - ctx = opentracing.ContextWithSpan(ctx, nil) // we're doing async stuff below; those need new traces + // We're doing async stuff below; those need new traces. + ctx = opentracing.ContextWithSpan(ctx, nil) sp.LogEvent(fmt.Sprintf("resolving intents [wait=%t]", wait)) var reqsRemote []roachpb.Request diff --git a/storage/replica_test.go b/storage/replica_test.go index e1445dde6d08..a311426c1fcb 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -2286,11 +2286,12 @@ func TestReplicaResolveIntentNoWait(t *testing.T) { setupResolutionTest(t, tc, roachpb.Key("a") /* irrelevant */, splitKey) txn := newTransaction("name", key, 1, roachpb.SERIALIZABLE, tc.clock) txn.Status = roachpb.COMMITTED - if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, []roachpb.Intent{{ - Span: roachpb.Span{Key: key}, - Txn: txn.TxnMeta, - Status: txn.Status, - }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { + if pErr := tc.store.intentResolver.resolveIntents(context.Background(), tc.rng, + []roachpb.Intent{{ + Span: roachpb.Span{Key: key}, + Txn: txn.TxnMeta, + Status: txn.Status, + }}, false /* !wait */, false /* !poison; irrelevant */); pErr != nil { t.Fatal(pErr) } util.SucceedsSoon(t, func() error { diff --git a/util/stop/stopper.go b/util/stop/stopper.go index 25c699197ec8..203f3cecc952 100644 --- a/util/stop/stopper.go +++ b/util/stop/stopper.go @@ -151,11 +151,22 @@ func (s *Stopper) RunAsyncTask(f func()) bool { func (s *Stopper) RunLimitedAsyncTask(sem chan struct{}, f func()) bool { file, line, _ := caller.Lookup(1) key := taskKey{file, line} + + // Wait for permission to run from the semaphore. select { case sem <- struct{}{}: case <-s.ShouldDrain(): return false + default: + log.Printf("stopper throttling task from %s:%d due to semaphore", file, line) + // Retry the select without the default. + select { + case sem <- struct{}{}: + case <-s.ShouldDrain(): + return false + } } + if !s.runPrelude(key) { <-sem return false