From ed0937ca424eec5c6063398adf5fd4743194599e Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 29 Jun 2016 12:08:18 -0400 Subject: [PATCH] storage: parallelize processing of Replica.handleRaftReady Improves the performance of `block_writer` inserting 1m rows from 250s to 195s. We default concurrency per store to `2*num-cpu`. There wasn't any noticeable harm or benefit to allowing more concurrency than that, but it seems prudent to provide some limit. Fixes #7522. --- storage/store.go | 47 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/storage/store.go b/storage/store.go index 7eaf437a51fa..0b7442fd69e7 100644 --- a/storage/store.go +++ b/storage/store.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "fmt" + "runtime" "sync" "sync/atomic" "time" @@ -87,6 +88,10 @@ var changeTypeInternalToRaft = map[roachpb.ReplicaChangeType]raftpb.ConfChangeTy roachpb.REMOVE_REPLICA: raftpb.ConfChangeRemoveNode, } +// The amount of concurrency to allow in processing replicas on a per-store +// basis. +var storeConcurrency = 2 * runtime.NumCPU() + // TestStoreContext has some fields initialized with values relevant // in tests. func TestStoreContext() StoreContext { @@ -102,6 +107,20 @@ func TestStoreContext() StoreContext { } } +type semaphore chan struct{} + +func makeSemaphore(n int) semaphore { + return make(semaphore, n) +} + +func (s semaphore) acquire() { + s <- struct{}{} +} + +func (s semaphore) release() { + <-s +} + // verifyKeys verifies keys. If checkEndKey is true, then the end key // is verified to be non-nil and greater than start key. If // checkEndKey is false, end key is verified to be nil. Additionally, @@ -1492,9 +1511,9 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error { return s.processRangeDescriptorUpdateLocked(origRng) } -// MergeRange expands the subsuming range to absorb the subsumed range. -// This merge operation will fail if the two ranges are not collocated -// on the same store. Must be called from the processRaft goroutine. +// MergeRange expands the subsuming range to absorb the subsumed range. This +// merge operation will fail if the two ranges are not collocated on the same +// store. Must be called (perhaps indirectly) from the processRaft goroutine. func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, subsumedRangeID roachpb.RangeID) error { subsumingDesc := subsumingRng.Desc() @@ -1514,8 +1533,9 @@ func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, su subsumedDesc.Replicas, subsumingDesc.Replicas) } - // Remove and destroy the subsumed range. Note that we are on the - // processRaft goroutine so we can call removeReplicaImpl directly. + // Remove and destroy the subsumed range. Note that we were called + // (indirectly) from the processRaft goroutine so we must call + // removeReplicaImpl directly to avoid deadlocking on processRaftMu. if err := s.removeReplicaImpl(subsumedRng, *subsumedDesc, false); err != nil { return errors.Errorf("cannot remove range %s", err) } @@ -2140,6 +2160,8 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) { // appropriate range. This method starts a goroutine to process Raft // commands indefinitely or until the stopper signals. func (s *Store) processRaft() { + sem := makeSemaphore(storeConcurrency) + s.stopper.RunWorker(func() { defer s.ctx.Transport.Stop(s.StoreID()) ticker := time.NewTicker(s.ctx.RaftTickInterval) @@ -2159,11 +2181,20 @@ func (s *Store) processRaft() { } s.pendingRaftGroups.Unlock() s.mu.Unlock() + + var wg sync.WaitGroup + wg.Add(len(replicas)) for _, r := range replicas { - if err := r.handleRaftReady(); err != nil { - panic(err) // TODO(bdarnell) - } + sem.acquire() + go func(r *Replica) { + if err := r.handleRaftReady(); err != nil { + panic(err) // TODO(bdarnell) + } + wg.Done() + sem.release() + }(r) } + wg.Wait() s.processRaftMu.Unlock() select {