Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: parallelize processing of Replica.handleRaftReady #7531

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"bytes"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -87,6 +88,8 @@ var changeTypeInternalToRaft = map[roachpb.ReplicaChangeType]raftpb.ConfChangeTy
roachpb.REMOVE_REPLICA: raftpb.ConfChangeRemoveNode,
}

var storeReplicaRaftReadyConcurrency = 2 * runtime.NumCPU()

// TestStoreContext has some fields initialized with values relevant
// in tests.
func TestStoreContext() StoreContext {
Expand All @@ -102,6 +105,20 @@ func TestStoreContext() StoreContext {
}
}

type semaphore chan struct{}

func makeSemaphore(n int) semaphore {
return make(semaphore, n)
}

func (s semaphore) acquire() {
s <- struct{}{}
}

func (s semaphore) release() {
<-s
}

// verifyKeys verifies keys. If checkEndKey is true, then the end key
// is verified to be non-nil and greater than start key. If
// checkEndKey is false, end key is verified to be nil. Additionally,
Expand Down Expand Up @@ -1492,9 +1509,9 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error {
return s.processRangeDescriptorUpdateLocked(origRng)
}

// MergeRange expands the subsuming range to absorb the subsumed range.
// This merge operation will fail if the two ranges are not collocated
// on the same store. Must be called from the processRaft goroutine.
// MergeRange expands the subsuming range to absorb the subsumed range. This
// merge operation will fail if the two ranges are not collocated on the same
// store. Must be called (perhaps indirectly) from the processRaft goroutine.
func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, subsumedRangeID roachpb.RangeID) error {
subsumingDesc := subsumingRng.Desc()

Expand All @@ -1514,8 +1531,9 @@ func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, su
subsumedDesc.Replicas, subsumingDesc.Replicas)
}

// Remove and destroy the subsumed range. Note that we are on the
// processRaft goroutine so we can call removeReplicaImpl directly.
// Remove and destroy the subsumed range. Note that we were called
// (indirectly) from the processRaft goroutine so we must call
// removeReplicaImpl directly to avoid deadlocking on processRaftMu.
if err := s.removeReplicaImpl(subsumedRng, *subsumedDesc, false); err != nil {
return errors.Errorf("cannot remove range %s", err)
}
Expand Down Expand Up @@ -2140,6 +2158,8 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) {
// appropriate range. This method starts a goroutine to process Raft
// commands indefinitely or until the stopper signals.
func (s *Store) processRaft() {
sem := makeSemaphore(storeReplicaRaftReadyConcurrency)

s.stopper.RunWorker(func() {
defer s.ctx.Transport.Stop(s.StoreID())
ticker := time.NewTicker(s.ctx.RaftTickInterval)
Expand All @@ -2159,11 +2179,20 @@ func (s *Store) processRaft() {
}
s.pendingRaftGroups.Unlock()
s.mu.Unlock()

var wg sync.WaitGroup
wg.Add(len(replicas))
for _, r := range replicas {
if err := r.handleRaftReady(); err != nil {
panic(err) // TODO(bdarnell)
}
sem.acquire()
go func(r *Replica) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're avoiding the stopper on purpose, a comment wouldn't hurt as to not make this look like an omission.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we weren't previously checking the stopper in this code, but instead processing all of the replicas. I'm not sure why would need to check the stopper with this change.

if err := r.handleRaftReady(); err != nil {
panic(err) // TODO(bdarnell)
}
wg.Done()
sem.release()
}(r)
}
wg.Wait()
s.processRaftMu.Unlock()

select {
Expand Down