Skip to content

Commit

Permalink
storage: pool allocations of RaftMessageRequest objects
Browse files Browse the repository at this point in the history
This was the top allocator in a run of Sysbench's `oltp_insert` workload.

Release note: None
  • Loading branch information
nvanbenschoten committed Oct 10, 2019
1 parent 6cd987e commit ec3ae9e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
16 changes: 16 additions & 0 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -207,6 +208,21 @@ func raftEntryFormatter(data []byte) string {
return fmt.Sprintf("[%x] [%d]", commandID, len(data))
}

var raftMessageRequestPool = sync.Pool{
New: func() interface{} {
return &RaftMessageRequest{}
},
}

func newRaftMessageRequest() *RaftMessageRequest {
return raftMessageRequestPool.Get().(*RaftMessageRequest)
}

func (m *RaftMessageRequest) release() {
*m = RaftMessageRequest{}
raftMessageRequestPool.Put(m)
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,15 @@ func (t *RaftTransport) processQueue(
return err
case req := <-ch:
batch.Requests = append(batch.Requests, *req)
req.release()
// Pull off as many queued requests as possible.
//
// TODO(peter): Think about limiting the size of the batch we send.
for done := false; !done; {
select {
case req = <-ch:
batch.Requests = append(batch.Requests, *req)
req.release()
default:
done = true
}
Expand Down Expand Up @@ -532,8 +534,9 @@ func (t *RaftTransport) getQueue(
// SendAsync sends a message to the recipient specified in the request. It
// returns false if the outgoing queue is full. The returned bool may be a false
// positive but will never be a false negative; if sent is true the message may
// or may not actually be sent but if it's false the message definitely was
// not sent.
// or may not actually be sent but if it's false the message definitely was not
// sent. If the method does return true, it is not safe to continue using the
// reference to the provided request.
func (t *RaftTransport) SendAsync(req *RaftMessageRequest, class rpc.ConnectionClass) (sent bool) {
toNodeID := req.ToReplica.NodeID
stats := t.getStats(toNodeID, class)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ func TestSendAndReceive(t *testing.T) {
ReplicaID: replicaIDs[toStoreID],
},
}
if !transports[storeNodes[fromStoreID]].SendAsync(expReq, rpc.DefaultClass) {
// NB: argument passed to SendAsync is not safe to use after; make a copy.
expReqCopy := *expReq
if !transports[storeNodes[fromStoreID]].SendAsync(&expReqCopy, rpc.DefaultClass) {
t.Errorf("unable to send message from %d to %d", fromStoreID, toStoreID)
}
// NB: proto.Equal will panic here since it doesn't know about `gogoproto.casttype`.
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,13 +1167,16 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
return
}

if !r.sendRaftMessageRequest(ctx, &RaftMessageRequest{
req := newRaftMessageRequest()
*req = RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
}) {
}
if !r.sendRaftMessageRequest(ctx, req) {
req.release()
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
r.mu.droppedMessages++
raftGroup.ReportUnreachable(msg.To)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ func (s *Store) sendQueuedHeartbeatsToNode(
log.Fatal(ctx, "cannot coalesce both heartbeats and responses")
}

chReq := &RaftMessageRequest{
chReq := newRaftMessageRequest()
*chReq = RaftMessageRequest{
RangeID: 0,
ToReplica: roachpb.ReplicaDescriptor{
NodeID: to.NodeID,
Expand All @@ -677,6 +678,7 @@ func (s *Store) sendQueuedHeartbeatsToNode(
}

if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) {
chReq.release()
for _, beat := range beats {
if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok {
(*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID)
Expand Down

0 comments on commit ec3ae9e

Please sign in to comment.