From 8e44cdc0a218ffddf751d540e528cc54136ef990 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 Mar 2016 23:44:43 -0400 Subject: [PATCH] storage: Limit the number of tasks started by intentResolver This is a last line of defense against issues like #4925. --- storage/intent_resolver.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 5edcbdf0af32..091be0673e2d 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -32,11 +32,20 @@ import ( "golang.org/x/net/context" ) +// intentResolverTaskLimit is the maximum number of asynchronous tasks +// that may be started by intentResolver. When this limit is reached +// asynchronous tasks will start to block to apply backpressure. +// This is a last line of defense against issues like #4925. +// TODO(bdarnell): how to determine best value? +const intentResolverTaskLimit = 100 + // intentResolver manages the process of pushing transactions and // resolving intents. type intentResolver struct { store *Store + sem chan struct{} // Semaphore to limit async goroutines + mu struct { sync.Mutex // Maps transaction ids to a refcount. @@ -47,6 +56,7 @@ type intentResolver struct { func newIntentResolver(store *Store) *intentResolver { ir := &intentResolver{ store: store, + sem: make(chan struct{}, intentResolverTaskLimit), } ir.mu.inFlight = map[*uuid.UUID]int{} return ir @@ -225,7 +235,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA for _, item := range intents { if item.args.Method() != roachpb.EndTransaction { - stopper.RunAsyncTask(func() { + stopper.RunLimitedAsyncTask(ir.sem, func() { // Everything here is best effort; give up rather than waiting // too long (helps avoid deadlocks during test shutdown, // although this is imperfect due to the use of an @@ -245,7 +255,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA } }) } else { // EndTransaction - stopper.RunAsyncTask(func() { + stopper.RunLimitedAsyncTask(ir.sem, func() { ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout) defer cancel() @@ -370,7 +380,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent return pErr } wg.Add(1) - if wait || !r.store.Stopper().RunAsyncTask(func() { + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { if err := action(); err != nil { log.Warningf("unable to resolve local intents; %s", err) } @@ -393,7 +403,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent // TODO(tschottdorf): no tracing here yet. return r.store.DB().Run(b) } - if wait || !r.store.Stopper().RunAsyncTask(func() { + if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() { if err := action(); err != nil { log.Warningf("unable to resolve external intents: %s", err) }