Skip to content

Commit

Permalink
Merge pull request #7531 from petermattis/pmattis/parallel-handle-raf…
Browse files Browse the repository at this point in the history
…t-ready

storage: parallelize processing of Replica.handleRaftReady
  • Loading branch information
petermattis authored Jun 29, 2016
2 parents 6cb6264 + 5b04a8f commit a97d757
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"bytes"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -87,6 +88,8 @@ var changeTypeInternalToRaft = map[roachpb.ReplicaChangeType]raftpb.ConfChangeTy
roachpb.REMOVE_REPLICA: raftpb.ConfChangeRemoveNode,
}

var storeReplicaRaftReadyConcurrency = 2 * runtime.NumCPU()

// TestStoreContext has some fields initialized with values relevant
// in tests.
func TestStoreContext() StoreContext {
Expand All @@ -102,6 +105,20 @@ func TestStoreContext() StoreContext {
}
}

type semaphore chan struct{}

func makeSemaphore(n int) semaphore {
return make(semaphore, n)
}

func (s semaphore) acquire() {
s <- struct{}{}
}

func (s semaphore) release() {
<-s
}

// verifyKeys verifies keys. If checkEndKey is true, then the end key
// is verified to be non-nil and greater than start key. If
// checkEndKey is false, end key is verified to be nil. Additionally,
Expand Down Expand Up @@ -1492,9 +1509,9 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error {
return s.processRangeDescriptorUpdateLocked(origRng)
}

// MergeRange expands the subsuming range to absorb the subsumed range.
// This merge operation will fail if the two ranges are not collocated
// on the same store. Must be called from the processRaft goroutine.
// MergeRange expands the subsuming range to absorb the subsumed range. This
// merge operation will fail if the two ranges are not collocated on the same
// store. Must be called (perhaps indirectly) from the processRaft goroutine.
func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, subsumedRangeID roachpb.RangeID) error {
subsumingDesc := subsumingRng.Desc()

Expand All @@ -1514,8 +1531,9 @@ func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, su
subsumedDesc.Replicas, subsumingDesc.Replicas)
}

// Remove and destroy the subsumed range. Note that we are on the
// processRaft goroutine so we can call removeReplicaImpl directly.
// Remove and destroy the subsumed range. Note that we were called
// (indirectly) from the processRaft goroutine so we must call
// removeReplicaImpl directly to avoid deadlocking on processRaftMu.
if err := s.removeReplicaImpl(subsumedRng, *subsumedDesc, false); err != nil {
return errors.Errorf("cannot remove range %s", err)
}
Expand Down Expand Up @@ -2140,6 +2158,8 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) {
// appropriate range. This method starts a goroutine to process Raft
// commands indefinitely or until the stopper signals.
func (s *Store) processRaft() {
sem := makeSemaphore(storeReplicaRaftReadyConcurrency)

s.stopper.RunWorker(func() {
defer s.ctx.Transport.Stop(s.StoreID())
ticker := time.NewTicker(s.ctx.RaftTickInterval)
Expand All @@ -2159,11 +2179,20 @@ func (s *Store) processRaft() {
}
s.pendingRaftGroups.Unlock()
s.mu.Unlock()

var wg sync.WaitGroup
wg.Add(len(replicas))
for _, r := range replicas {
if err := r.handleRaftReady(); err != nil {
panic(err) // TODO(bdarnell)
}
sem.acquire()
go func(r *Replica) {
if err := r.handleRaftReady(); err != nil {
panic(err) // TODO(bdarnell)
}
wg.Done()
sem.release()
}(r)
}
wg.Wait()
s.processRaftMu.Unlock()

select {
Expand Down

0 comments on commit a97d757

Please sign in to comment.