Skip to content

Commit

Permalink
storage: Limit the number of tasks started by intentResolver
Browse files Browse the repository at this point in the history
This is a last line of defense against issues like cockroachdb#4925.
  • Loading branch information
bdarnell committed Mar 14, 2016
1 parent 441e602 commit 8e44cdc
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ import (
"golang.org/x/net/context"
)

// intentResolverTaskLimit is the maximum number of asynchronous tasks
// that may be started by intentResolver. When this limit is reached
// asynchronous tasks will start to block to apply backpressure.
// This is a last line of defense against issues like #4925.
// TODO(bdarnell): how to determine best value?
const intentResolverTaskLimit = 100

// intentResolver manages the process of pushing transactions and
// resolving intents.
type intentResolver struct {
store *Store

sem chan struct{} // Semaphore to limit async goroutines

mu struct {
sync.Mutex
// Maps transaction ids to a refcount.
Expand All @@ -47,6 +56,7 @@ type intentResolver struct {
func newIntentResolver(store *Store) *intentResolver {
ir := &intentResolver{
store: store,
sem: make(chan struct{}, intentResolverTaskLimit),
}
ir.mu.inFlight = map[*uuid.UUID]int{}
return ir
Expand Down Expand Up @@ -225,7 +235,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA

for _, item := range intents {
if item.args.Method() != roachpb.EndTransaction {
stopper.RunAsyncTask(func() {
stopper.RunLimitedAsyncTask(ir.sem, func() {
// Everything here is best effort; give up rather than waiting
// too long (helps avoid deadlocks during test shutdown,
// although this is imperfect due to the use of an
Expand All @@ -245,7 +255,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
}
})
} else { // EndTransaction
stopper.RunAsyncTask(func() {
stopper.RunLimitedAsyncTask(ir.sem, func() {
ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout)
defer cancel()

Expand Down Expand Up @@ -370,7 +380,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent
return pErr
}
wg.Add(1)
if wait || !r.store.Stopper().RunAsyncTask(func() {
if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() {
if err := action(); err != nil {
log.Warningf("unable to resolve local intents; %s", err)
}
Expand All @@ -393,7 +403,7 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica, intent
// TODO(tschottdorf): no tracing here yet.
return r.store.DB().Run(b)
}
if wait || !r.store.Stopper().RunAsyncTask(func() {
if wait || !r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() {
if err := action(); err != nil {
log.Warningf("unable to resolve external intents: %s", err)
}
Expand Down

0 comments on commit 8e44cdc

Please sign in to comment.