Skip to content

Commit

Permalink
storage: coalesce heartbeats
Browse files Browse the repository at this point in the history
Coalesce heartbeats and heartbeat responses bound for the same store
into a single proto. Introduce a new environment flag (default: true
to enable this in integration tests, will be changed to false before
merging) COCKROACH_ENABLE_COALESCED_HEARTBEATS to turn this feature
on. Added metrics and a graph in the admin UI to track the number of
queued heartbeats waiting to be coalesced.
  • Loading branch information
Arjun Narayan committed Sep 26, 2016
1 parent 3e55a5d commit 5a2856f
Show file tree
Hide file tree
Showing 10 changed files with 602 additions and 78 deletions.
1 change: 1 addition & 0 deletions storage/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ func TestRaftAfterRemoveRange(t *testing.T) {
return nil
})

// Test that a coalesced heartbeat is ingested correctly
replica1 := roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(mtc.stores[1].StoreID()),
NodeID: roachpb.NodeID(mtc.stores[1].StoreID()),
Expand All @@ -1702,14 +1703,19 @@ func TestRaftAfterRemoveRange(t *testing.T) {
StoreID: mtc.stores[2].StoreID(),
}
mtc.transports[2].SendAsync(&storage.RaftMessageRequest{
RangeID: 0, // TODO(bdarnell): wtf is this testing?
RangeID: 0,
ToReplica: replica1,
FromReplica: replica2,
Message: raftpb.Message{
From: uint64(replica2.ReplicaID),
To: uint64(replica1.ReplicaID),
Type: raftpb.MsgHeartbeat,
},
Heartbeats: []storage.RaftHeartbeat{
{
RangeID: rangeID,
FromReplicaID: replica2.ReplicaID,
ToReplicaID: replica1.ReplicaID,
},
},
})
// Execute another replica change to ensure that raft has processed
// the heartbeat just sent.
Expand Down
17 changes: 12 additions & 5 deletions storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,14 @@ var (
Name: "raft.rcvd.dropped",
Help: "Number of dropped incoming Raft messages",
}

metaRaftEnqueuedPending = metric.Metadata{Name: "raft.enqueued.pending",
Help: "Number of pending outgoing messages in the Raft Transport queue"}
metaRaftEnqueuedPending = metric.Metadata{
Name: "raft.enqueued.pending",
Help: "Number of pending outgoing messages in the Raft Transport queue",
}
metaRaftCoalescedHeartbeatsPending = metric.Metadata{
Name: "raft.heartbeats.pending",
Help: "Number of pending heartbeats and responses waiting to be coalesced",
}

// Replica queue metrics.
metaGCQueueSuccesses = metric.Metadata{Name: "queue.gc.process.success",
Expand Down Expand Up @@ -299,7 +304,8 @@ type StoreMetrics struct {
// TODO(arjun): eliminate this duplication.
raftRcvdMessages map[raftpb.MessageType]*metric.Counter

RaftEnqueuedPending *metric.Gauge
RaftEnqueuedPending *metric.Gauge
RaftCoalescedHeartbeatsPending *metric.Gauge

// Replica queue metrics.
GCQueueSuccesses *metric.Counter
Expand Down Expand Up @@ -421,7 +427,8 @@ func newStoreMetrics() *StoreMetrics {
RaftRcvdMsgDropped: metric.NewCounter(metaRaftRcvdDropped),
raftRcvdMessages: make(map[raftpb.MessageType]*metric.Counter, len(raftpb.MessageType_name)),

RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending),
RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending),
RaftCoalescedHeartbeatsPending: metric.NewGauge(metaRaftCoalescedHeartbeatsPending),

// Replica queue metrics.
GCQueueSuccesses: metric.NewCounter(metaGCQueueSuccesses),
Expand Down
Loading

0 comments on commit 5a2856f

Please sign in to comment.