Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: remove handleRaftReady from processRequestQueue, pool RaftMessageRequests #41507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -207,6 +208,21 @@ func raftEntryFormatter(data []byte) string {
return fmt.Sprintf("[%x] [%d]", commandID, len(data))
}

var raftMessageRequestPool = sync.Pool{
New: func() interface{} {
return &RaftMessageRequest{}
},
}

func newRaftMessageRequest() *RaftMessageRequest {
return raftMessageRequestPool.Get().(*RaftMessageRequest)
}

func (m *RaftMessageRequest) release() {
*m = RaftMessageRequest{}
raftMessageRequestPool.Put(m)
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,15 @@ func (t *RaftTransport) processQueue(
return err
case req := <-ch:
batch.Requests = append(batch.Requests, *req)
req.release()
// Pull off as many queued requests as possible.
//
// TODO(peter): Think about limiting the size of the batch we send.
for done := false; !done; {
select {
case req = <-ch:
batch.Requests = append(batch.Requests, *req)
req.release()
default:
done = true
}
Expand Down Expand Up @@ -532,8 +534,9 @@ func (t *RaftTransport) getQueue(
// SendAsync sends a message to the recipient specified in the request. It
// returns false if the outgoing queue is full. The returned bool may be a false
// positive but will never be a false negative; if sent is true the message may
// or may not actually be sent but if it's false the message definitely was
// not sent.
// or may not actually be sent but if it's false the message definitely was not
// sent. If the method does return true, it is not safe to continue using the
// reference to the provided request.
func (t *RaftTransport) SendAsync(req *RaftMessageRequest, class rpc.ConnectionClass) (sent bool) {
toNodeID := req.ToReplica.NodeID
stats := t.getStats(toNodeID, class)
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ func TestSendAndReceive(t *testing.T) {
req.Message.Type = messageType

if !transports[fromNodeID].SendAsync(&req, rpc.DefaultClass) {
t.Errorf("unable to send %s from %d to %d", req.Message.Type, fromNodeID, toNodeID)
t.Errorf("unable to send %s from %d to %d", messageType, fromNodeID, toNodeID)
}
messageTypeCounts[toStoreID][req.Message.Type]++
messageTypeCounts[toStoreID][messageType]++
}
}
}
Expand Down Expand Up @@ -356,7 +356,9 @@ func TestSendAndReceive(t *testing.T) {
ReplicaID: replicaIDs[toStoreID],
},
}
if !transports[storeNodes[fromStoreID]].SendAsync(expReq, rpc.DefaultClass) {
// NB: argument passed to SendAsync is not safe to use after; make a copy.
expReqCopy := *expReq
if !transports[storeNodes[fromStoreID]].SendAsync(&expReqCopy, rpc.DefaultClass) {
t.Errorf("unable to send message from %d to %d", fromStoreID, toStoreID)
}
// NB: proto.Equal will panic here since it doesn't know about `gogoproto.casttype`.
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ type handleRaftReadyStats struct {
var noSnap IncomingSnapshot

// handleRaftReady processes a raft.Ready containing entries and messages that
// are ready to read, be saved to stable storage, committed or sent to other
// are ready to read, be saved to stable storage, committed, or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
//
Expand Down Expand Up @@ -1167,13 +1167,16 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
return
}

if !r.sendRaftMessageRequest(ctx, &RaftMessageRequest{
req := newRaftMessageRequest()
*req = RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
}) {
}
if !r.sendRaftMessageRequest(ctx, req) {
req.release()
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
r.mu.droppedMessages++
raftGroup.ReportUnreachable(msg.To)
Expand Down
50 changes: 19 additions & 31 deletions pkg/storage/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,15 @@ func (q *rangeIDQueue) back() *rangeIDChunk {
}

type raftProcessor interface {
// Process a raft.Ready struct containing entries and messages that are
// ready to read, be saved to stable storage, committed, or sent to other
// peers.
processReady(context.Context, roachpb.RangeID)
processRequestQueue(context.Context, roachpb.RangeID)
// Process a raft tick for the specified range. Return true if the range
// should be queued for ready processing.
// Process all queued messages for the specified range.
// Return true if the range should be queued for ready processing.
processRequestQueue(context.Context, roachpb.RangeID) bool
// Process a raft tick for the specified range.
// Return true if the range should be queued for ready processing.
processTick(context.Context, roachpb.RangeID) bool
}

Expand Down Expand Up @@ -199,44 +204,27 @@ func (s *raftScheduler) worker(ctx context.Context) {
s.mu.state[id] = stateQueued
s.mu.Unlock()

// Process requests first. This avoids a scenario where a tick and a
// "quiesce" message are processed in the same iteration and intervening
// raft ready processing unquiesces the replica because the tick triggers
// an election.
if state&stateRaftRequest != 0 {
// processRequestQueue returns true if the range should perform ready
// processing. Do not reorder this below the call to processReady.
if s.processor.processRequestQueue(ctx, id) {
state |= stateRaftReady
}
}
if state&stateRaftTick != 0 {
// processRaftTick returns true if the range should perform ready
// processing. Do not reorder this below the call to processReady.
if s.processor.processTick(ctx, id) {
state |= stateRaftReady
}
}
// TODO(nvanbenschoten): Consider removing the call to handleRaftReady
// from processRequestQueue. If we did this then processReady would be
// the only place where we call into handleRaftReady. This would
// eliminate superfluous calls into the function and would improve
// batching. It would also simplify the code in processRequestQueue.
//
// The code change here would likely look something like:
//
// if state&stateRaftRequest != 0 {
// if s.processor.processRequestQueue(ctx, id) {
// state |= stateRaftReady
// }
// }
//
// Initial experimentation with this approach indicated that it reduced
// throughput for single-Range write-heavy workloads. More investigation
// is needed to determine whether that should be expected.
if state&stateRaftReady != 0 {
s.processor.processReady(ctx, id)
}
// Process requests last. This avoids a scenario where a tick and a
// "quiesce" message are processed in the same iteration and intervening
// raft ready processing unquiesced the replica. Note that request
// processing could also occur first, it just shouldn't occur in between
// ticking and ready processing. It is possible for a tick to be enqueued
// concurrently with the quiescing in which case the replica will
// unquiesce when the tick is processed, but we'll wake the leader in
// that case.
if state&stateRaftRequest != 0 {
s.processor.processRequestQueue(ctx, id)
}

s.mu.Lock()
state = s.mu.state[id]
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ func (p *testProcessor) processReady(_ context.Context, rangeID roachpb.RangeID)
p.mu.Unlock()
}

func (p *testProcessor) processRequestQueue(_ context.Context, rangeID roachpb.RangeID) {
func (p *testProcessor) processRequestQueue(_ context.Context, rangeID roachpb.RangeID) bool {
p.mu.Lock()
p.mu.raftRequest[rangeID]++
p.mu.Unlock()
return false
}

func (p *testProcessor) processTick(_ context.Context, rangeID roachpb.RangeID) bool {
Expand Down
54 changes: 19 additions & 35 deletions pkg/storage/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,38 +418,28 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) {
s.scheduler.EnqueueRaftReady(rangeID)
}

func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) {
func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.replicaQueues.Load(int64(rangeID))
if !ok {
return
return false
}
q := (*raftRequestQueue)(value)
q.Lock()
infos := q.infos
q.infos = nil
q.Unlock()
if len(infos) == 0 {
return false
}

var lastRepl *Replica
var hadError bool
for i, info := range infos {
last := i == len(infos)-1
pErr := s.withReplicaForRequest(ctx, info.req, func(ctx context.Context, r *Replica) *roachpb.Error {
// Save the last Replica we see, since we don't know in advance which
// requests will fail during Replica retrieval. We want this later
// so we can handle the Raft Ready state all at once.
lastRepl = r
pErr := s.processRaftRequestWithReplica(ctx, r, info.req)
if last {
// If this is the last request, we can handle raft.Ready without
// giving up the lock. Set lastRepl to nil, so we don't handle it
// down below as well.
lastRepl = nil
_, expl, err := r.handleRaftReadyRaftMuLocked(ctx, noSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)
}
return pErr
})
if pErr != nil {
for i := range infos {
info := &infos[i]
if pErr := s.withReplicaForRequest(
ctx, info.req, func(ctx context.Context, r *Replica) *roachpb.Error {
return s.processRaftRequestWithReplica(ctx, r, info.req)
},
); pErr != nil {
hadError = true
if err := info.respStream.Send(newRaftMessageResponse(info.req, pErr)); err != nil {
// Seems excessive to log this on every occurrence as the other side
Expand Down Expand Up @@ -479,18 +469,10 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
}
}

// If lastRepl is not nil, that means that some of the requests succeeded during
// Replica retrieval (withReplicaForRequest) but that the last request did not,
// otherwise we would have handled this above and set lastRepl to nil.
if lastRepl != nil {
// lastRepl will be unlocked when we exit withReplicaForRequest above.
// It's fine to relock it here (by calling handleRaftReady instead of
// handleRaftReadyRaftMuLocked) since racing to handle Raft Ready won't
// have any undesirable results.
ctx = lastRepl.AnnotateCtx(ctx)
_, expl, err := lastRepl.handleRaftReady(ctx, noSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)
}
// NB: Even if we had errors and the corresponding replica no longer
// exists, returning true here won't cause a new, uninitialized replica
// to be created in processReady().
return true // ready
}

func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
Expand Down Expand Up @@ -676,7 +658,8 @@ func (s *Store) sendQueuedHeartbeatsToNode(
log.Fatal(ctx, "cannot coalesce both heartbeats and responses")
}

chReq := &RaftMessageRequest{
chReq := newRaftMessageRequest()
*chReq = RaftMessageRequest{
RangeID: 0,
ToReplica: roachpb.ReplicaDescriptor{
NodeID: to.NodeID,
Expand All @@ -699,6 +682,7 @@ func (s *Store) sendQueuedHeartbeatsToNode(
}

if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) {
chReq.release()
for _, beat := range beats {
if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID)
Expand Down