Skip to content

Commit

Permalink
Merge pull request cockroachdb#8976 from petermattis/pmattis/synchron…
Browse files Browse the repository at this point in the history
…ize-process-replicas

storage: avoid processing replicas concurrently in a queue
  • Loading branch information
petermattis authored Aug 31, 2016
2 parents 33f28d3 + 4593774 commit 10a9430
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ type baseQueue struct {
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem (for updating priority)
purgatory map[roachpb.RangeID]error // Map of replicas to processing errors
}
// processMu synchronizes execution of processing for a single queue,
// ensuring that we never process more than a single replica at a time. This
// is needed because both the main processing loop and the purgatory loop can
// process replicas.
processMu sync.Locker
// Some tests in this package disable queues.
disabled int32 // updated atomically

Expand Down Expand Up @@ -230,6 +235,7 @@ func makeBaseQueue(
}
bq.mu.Locker = new(syncutil.Mutex)
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}
bq.processMu = new(syncutil.Mutex)
return bq
}

Expand Down Expand Up @@ -443,6 +449,9 @@ func (bq *baseQueue) processLoop(clock *hlc.Clock, stopper *stop.Stopper) {
// called externally to the queue. bq.mu.Lock should not be held
// while calling this method.
func (bq *baseQueue) processReplica(repl *Replica, clock *hlc.Clock) error {
bq.processMu.Lock()
defer bq.processMu.Unlock()

// Load the system config.
cfg, ok := bq.gossip.GetSystemConfig()
if !ok {
Expand Down

0 comments on commit 10a9430

Please sign in to comment.