Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: coalesce heartbeats #9380

Merged
merged 1 commit into from
Oct 20, 2016
Merged

storage: coalesce heartbeats #9380

merged 1 commit into from
Oct 20, 2016

Conversation

rjnn
Copy link
Contributor

@rjnn rjnn commented Sep 14, 2016

Coalesce heartbeats and heartbeat responses bound for the same store
into a single proto. Introduce a new environment flag (default: true
to enable this in integration tests, will be changed to false before
merging) COCKROACH_ENABLE_COALESCED_HEARTBEATS to turn this feature
on. Added metrics and a graph in the admin UI to track the number of
queued heartbeats waiting to be coalesced.

Unlike the earlier proposal for coalesced heartbeats which would contain zero
additional information, this version sends large heartbeat packets that wrap up
all the individual heartbeat messages.

I would appreciate feedback on the following points:

  1. Should I use a slice locked by a mutex for the heartbeat message queue,
    as opposed to the current channel based implementation?
  2. Am I including sufficient information in the reconstructed heartbeat message?
  3. Are there any other metrics that we would like to collect and graph to make this clearer?

This change is Reviewable

@petermattis
Copy link
Collaborator

Review status: 0 of 10 files reviewed at latest revision, 13 unresolved discussions, some commit checks failed.


storage/raft.proto, line 32 at r1 (raw file):

      (gogoproto.casttype) = "github.com/cockroachdb/cockroach/roachpb.RangeID"];
  optional roachpb.ReplicaDescriptor from_replica = 2 [(gogoproto.nullable) = false];
  optional roachpb.ReplicaDescriptor to_replica = 3 [(gogoproto.nullable) = false];

Lots of duplicate (and unnecessary?) info in from_replica and to_replica. Can you just use the replica IDs here?


storage/raft.proto, line 34 at r1 (raw file):

  optional roachpb.ReplicaDescriptor to_replica = 3 [(gogoproto.nullable) = false];
  optional uint64 term = 4 [(gogoproto.nullable) = false];
  optional uint64 log_term = 5 [(gogoproto.nullable) = false];

What is the difference between term and log_term? I thought raft.Message only had term and commit set for heartbeats.


storage/raft.proto, line 36 at r1 (raw file):

  optional uint64 log_term = 5 [(gogoproto.nullable) = false];
  optional uint64 commit = 6 [(gogoproto.nullable) = false];
  optional bool response = 7 [(gogoproto.nullable) = false];

Might be better to use raft.MessageType here. Or to pull this out in the RaftMessageRequest struct. Something like:

message RaftMessageRequest {
  repeated RaftHeartbeat heartbeats = ...;
  repeated RaftHeartbeat heartbeat_resps = ...;
}

storage/replica.go, line 1720 at r1 (raw file):

}

func (r *Replica) enqueueHeartbeat(msg raftpb.Message, isResponse bool) {

No need for isResponse. msg.Type will indicate whether you have a MsgHeartbeat or a MsgHeartbeatResp.


storage/replica.go, line 1727 at r1 (raw file):

  r.mu.Unlock()
  if toErr != nil {
      log.Warningf(r.ctx, "cannot lookup ReplicaDescriptor for replica %d", msg.To)

Is continuing here appropriate? sendRaftMessage returns in the similar situation.


storage/replica.go, line 1742 at r1 (raw file):

      Response:    isResponse,
  }
  if len(r.store.coalescedHeartbeatChan) == coalescedHeartbeatChannelSize {

This is racey and can block if you lose the race. The better structure is something along the lines of:

select {
  case r.store.coalescedHeartbeatChan <- h:
  default:
    select {
      case r.store.earlyCoalesceHearbeats <- struct{}:
      default:
    }
}

But really this makes me suspect that using a buffered channel for coalescedHeartbeatChan is not appropriate and that you should be queueing up the heartbeat into a slice protected by a mutex.


storage/replica.go, line 1913 at r1 (raw file):

      default:
          r.sendRaftMessage(msg)
      }

I would have enqueueHeartbeat check msg.Type and return a boolean if the message was handled. Then you could replace the above with:

  if !r.maybeEnqueueHeartbeat(msg) {
    r.sendRaftMessage(msg)
  }

Or, given that this is the only place sendRaftMessage is called from, move all of the logic into sendRaftMessage.


storage/store.go, line 298 at r1 (raw file):

  intentResolver          *intentResolver
  raftEntryCache          *raftEntryCache
  // TODO(arjun): Should this be a slice protected with its own mutex, instead of a channel?

Yes.


storage/store.go, line 2336 at r1 (raw file):

      (!(req.Message.Type == raftpb.MsgHeartbeat) || (req.Message.Type == raftpb.MsgHeartbeatResp)) {
      s.metrics.raftRcvdMessages[req.Message.Type].Inc(1)
  }

Couldn't you just move this metrics accounting below the coalesced heartbeat fanout.


storage/store.go, line 2749 at r1 (raw file):

}

func (s *Store) coalescedHeartbeatLoop() {

Is there an advantage to making this separate from raftTickLoop? I was imagining that on every tick loop iteration you'd flush any buffered heartbeats before the next set of ticks.


storage/store.go, line 2772 at r1 (raw file):

  s.metrics.RaftCoalescedHeartbeatsPending.Update(int64(len(s.coalescedHeartbeatChan)))
  s.ctx.Transport.mu.Lock()
  chs := make(map[roachpb.NodeID][]RaftHeartbeat, len(s.ctx.Transport.mu.queues[false]))

Since you need per-node heartbeat lists here, rather than a single slice of queued heartbeats in Store, you should maintain this map. That would get rid of a decent chunk of the work here.


storage/store.go, line 2796 at r1 (raw file):

          ToReplica: roachpb.ReplicaDescriptor{
              NodeID:    beats[0].ToReplica.NodeID,
              StoreID:   beats[0].ToReplica.StoreID,

Per an earlier comment, since beats[i].ToReplica.{Node,Store}ID will be constant for the entire request, there is no need to be passing it in every beats[i]. That is, RaftHeartbeat only needs to contain a ToReplicaID, not the entire replica descriptor.


storage/store.go, line 2806 at r1 (raw file):

      if !s.ctx.Transport.SendAsync(chReq) {
          log.Warning(s.Ctx(), "TODO(arjun): What do we do if we can't send a coalesced heartbeat?")

Maybe loop over all of the replicas and call ReportUnreachable.


Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 3 times, most recently from 73f77be to 0f479fc Compare September 16, 2016 19:32
@rjnn
Copy link
Contributor Author

rjnn commented Sep 16, 2016

Reworked, now failing some MVCCStats, which I'm currently debugging, but it could use a review due to the change in architecture to maps from channels.


Review status: 0 of 10 files reviewed at latest revision, 13 unresolved discussions, some commit checks failed.


storage/raft.proto, line 32 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Lots of duplicate (and unnecessary?) info in from_replica and to_replica. Can you just use the replica IDs here?

Done.

storage/raft.proto, line 34 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

What is the difference between term and log_term? I thought raft.Message only had term and commit set for heartbeats.

I've removed `log_term`. I examined the `etcd/raft`code and your suggestion is correct. `MsgHeartbeat` only has `term` and `commit` set.

storage/raft.proto, line 36 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Might be better to use raft.MessageType here. Or to pull this out in the RaftMessageRequest struct. Something like:

message RaftMessageRequest {
  repeated RaftHeartbeat heartbeats = ...;
  repeated RaftHeartbeat heartbeat_resps = ...;
}
Done (your second proposal).

storage/replica.go, line 1720 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

No need for isResponse. msg.Type will indicate whether you have a MsgHeartbeat or a MsgHeartbeatResp.

Done.

storage/replica.go, line 1727 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Is continuing here appropriate? sendRaftMessage returns in the similar situation.

Done. (code has been moved, however)

storage/replica.go, line 1742 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This is racey and can block if you lose the race. The better structure is something along the lines of:

select {
  case r.store.coalescedHeartbeatChan <- h:
  default:
    select {
      case r.store.earlyCoalesceHearbeats <- struct{}:
      default:
    }
}

But really this makes me suspect that using a buffered channel for coalescedHeartbeatChan is not appropriate and that you should be queueing up the heartbeat into a slice protected by a mutex.

Changed to a map protected by a mutex.

storage/replica.go, line 1913 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I would have enqueueHeartbeat check msg.Type and return a boolean if the message was handled. Then you could replace the above with:

  if !r.maybeEnqueueHeartbeat(msg) {
    r.sendRaftMessage(msg)
  }

Or, given that this is the only place sendRaftMessage is called from, move all of the logic into sendRaftMessage.

Done. Logic moved there.

storage/store.go, line 298 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Yes.

Done, but used a map instead of a slice to keep `StoreIdent` information outside the proto.

storage/store.go, line 2336 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Couldn't you just move this metrics accounting below the coalesced heartbeat fanout.

Done.

storage/store.go, line 2749 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Is there an advantage to making this separate from raftTickLoop? I was imagining that on every tick loop iteration you'd flush any buffered heartbeats before the next set of ticks.

There isn't any more since with a slice locked by a mutex (which can grow), we don't ever have to trigger an early `coalescedHeartbeatLoop`. Removed.

storage/store.go, line 2772 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Since you need per-node heartbeat lists here, rather than a single slice of queued heartbeats in Store, you should maintain this map. That would get rid of a decent chunk of the work here.

Done. Doing things per-store, instead of per-node though (per-node would involve extra uncoalescing steps at the node level as well)

storage/store.go, line 2796 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Per an earlier comment, since beats[i].ToReplica.{Node,Store}ID will be constant for the entire request, there is no need to be passing it in every beats[i]. That is, RaftHeartbeat only needs to contain a ToReplicaID, not the entire replica descriptor.

Done.

storage/store.go, line 2806 at r1 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Maybe loop over all of the replicas and call ReportUnreachable.

Done.

Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: 0 of 10 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.


storage/raft.proto, line 67 at r2 (raw file):

  // range ID should be zero.
  repeated RaftHeartbeat heartbeats = 6 [(gogoproto.nullable) = false];
  repeated RaftHeartbeat heartbeat_resps = 7 [(gogoproto.nullable) = false];

I'm not sure if is allowed, but if it is add packed = true to both these repeated fields.


storage/replica.go, line 2266 at r2 (raw file):

  }

  if enableCoalescedHeartbeats && (msg.Type == raftpb.MsgHeartbeat || msg.Type == raftpb.MsgHeartbeatResp) {

You should move this into a maybeCoalesceHeartbeat method. More importantly, we're almost never hitting this code path with quiescence enabled.


storage/replica.go, line 2283 at r2 (raw file):

          toSlice := r.store.coalesced.heartbeats[toStore]
          if toSlice == nil {
              r.store.coalesced.heartbeats[toStore] = []RaftHeartbeat{}

This isn't necessary. Appending to a nil slice is the same as appending to an empty slice.


storage/store.go, line 2338 at r2 (raw file):

      // this recursive call back into HandleRaftRequest has len(...) == 0.
      if err := s.HandleRaftRequest(s.Ctx(), beatReq, respStream); err != nil {
          return err

Should failure to fanout one heartbeat stop processing of the remainder?


storage/store.go, line 2353 at r2 (raw file):

  if len(req.Heartbeats) > 0 {
      return s.fanoutCoalescedHeartbeats(req, respStream)

I wonder if this should be moved to the caller. It is slightly odd to see HandleRaftRequest -> fanoutCoalescedHeartbeats -> HandleRaftRequest.


storage/store.go, line 2757 at r2 (raw file):

              s.scheduler.EnqueueRaftTick(rangeIDs...)
              s.sendQueuedHeartbeats()

I would send the queued heartbeats first. By enqueuing the tick first you're going to be ticking and queueing up some additional heartbeats before sending.


Comments from Reviewable

@petermattis
Copy link
Collaborator

It might be useful to rework this in light of quiescence and only quiesce MsgHeartbeat. MsgHeartbeatResp traffic is immaterial. Also might want to think about doing this only for quiesce heartbeats if this provides a simplification as well.


Review status: 0 of 10 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.


Comments from Reviewable

@bdarnell
Copy link
Contributor

When most ranges are quiesced, MsgHeartbeatResp traffic is immaterial. But in that case, so is the traffic of coalescable MsgHeartbeats: Most heartbeats we send are quiesce messages, which are sent individually instead of being coalesced. Whenever we have a non-trivial volume of coalescable heartbeats, we will have an equal volume of responses.

I think that quiescence largely undermines the argument for coalescing heartbeats - I don't think it's going to help much in the steady state of our current test clusters. Where it might help, though, is in cases like #9446 in which quiescing doesn't work as well.

I'm concerned that sending per-range information in the heartbeats is not going to save enough relative to the non-coalesced case. In the million-ranges-per-store case that motivated coalesced heartbeats, those messages would still be many megabytes (of course, we're not to that scale yet).

What if we batched all raft messages by introducing a message RaftMessageRequests { repeated RaftMessageRequest requests = 1; }? This would avoid the system call overhead (which I think is the biggest cost of the heartbeats currently), and would work for the MsgApp and MsgAppResp messages that make up most of the traffic in a heavily-loaded cluster, although it wouldn't save as much space on the wire as this change.


Review status: 0 of 10 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Re: batching all Raft messages: heartbeats can be delayed for brief periods without issue. But don't we to send MsgApp and MsgAppResp as quickly as possible? I suppose we could make a small tweak to RaftTransport to pull as many messages from the queue as are available and send those in a single RPC.

@bdarnell
Copy link
Contributor

Yeah, I was thinking we'd read from the channel until we reached our target message size or it would block and then send that. The channel buffer would have time to fill up while we Send() the previous batch.

@rjnn rjnn force-pushed the coalesce_simple branch 2 times, most recently from 5f9adb6 to fbdb90c Compare September 26, 2016 18:30
@rjnn
Copy link
Contributor Author

rjnn commented Sep 26, 2016

I think quiescence and coalesced heartbeats complement each other well - quiescence ensures that we never have a million active ranges per store, and coalesced heartbeats ensures that when we do have a few thousand active ranges per store, we keep network traffic down.
I agree with Peter that we don't want to slow down MsgApp traffic (although there is an argument to be made for coalescing range leases, although node leases does solve that through a different mechanism), so coalescing them isn't great.
I agree that the ideal world is one where we don't send per-range information, but I think this is a good intermediate step to get to that point.

Test failures: currently failing TestFailedReplicaChange, TestConcurrentSnapshots, and TestReplicateAfterTruncation, which are all snapshot related. I don't know where the bug is, since I didn't touch anything snapshot related...


Review status: 0 of 10 files reviewed at latest revision, 19 unresolved discussions, some commit checks pending.


storage/raft.proto, line 67 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I'm not sure if is allowed, but if it is add packed = true to both these repeated fields.

Not allowed, `[packed = true] can only be specified for repeated primitive fields.`

storage/replica.go, line 2266 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

You should move this into a maybeCoalesceHeartbeat method. More importantly, we're almost never hitting this code path with quiescence enabled.

Done.

storage/replica.go, line 2283 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This isn't necessary. Appending to a nil slice is the same as appending to an empty slice.

Done.

storage/store.go, line 2338 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Should failure to fanout one heartbeat stop processing of the remainder?

What's the best solution here if we have multiple errors? Log it and continue any way? That's what I'm doing now.

storage/store.go, line 2353 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I wonder if this should be moved to the caller. It is slightly odd to see HandleRaftRequest -> fanoutCoalescedHeartbeats -> HandleRaftRequest.

Done.

storage/store.go, line 2757 at r2 (raw file):

Previously, petermattis (Peter Mattis) wrote…

I would send the queued heartbeats first. By enqueuing the tick first you're going to be ticking and queueing up some additional heartbeats before sending.

Done.

Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 2 times, most recently from d78d223 to 5a2856f Compare September 26, 2016 20:23
@tamird
Copy link
Contributor

tamird commented Sep 26, 2016

Reviewed 1 of 10 files at r1, 1 of 9 files at r2, 7 of 8 files at r3, 1 of 1 files at r4.
Review status: all files reviewed at latest revision, 25 unresolved discussions, some commit checks failed.


storage/replica.go, line 2323 at r4 (raw file):

}

func (r *Replica) maybeCoalesceHeartbeats(msg raftpb.Message,

wrap this in accordance with the style guide.


storage/replica.go, line 2325 at r4 (raw file):

func (r *Replica) maybeCoalesceHeartbeats(msg raftpb.Message,
  toReplica, fromReplica roachpb.ReplicaDescriptor) bool {
  if enableCoalescedHeartbeats && (msg.Type == raftpb.MsgHeartbeat || msg.Type == raftpb.MsgHeartbeatResp) {

this construction is oddly repetitive. How about:

if enableCoalescedHeartbeats {
  var hbMap map[roachpb.StoreIdent][]RaftHeartbeat
  switch msg.Type {
  case raftpb.MsgHeartbeat:
    hbMap = r.store.coalesced.heartbeats
  case raftpb.MsgHeartbeatResp:
    hbMap = r.store.coalesced.heartbeatsResponses
  default:
    return false
  }
  beat := ...
  toStore := ...
  ...Lock()
  hbMap[toStore] = append(hbMap[toStore], beat)
  ...Unlock()
  return true
}
return false

storage/store.go, line 297 at r4 (raw file):

  raftEntryCache          *raftEntryCache

  coalesced struct {

you should call this coalescedMu


storage/store.go, line 2383 at r4 (raw file):

          // back into HandleRaftRequest has len(...) == 0.
          if err := s.HandleRaftRequest(s.Ctx(), beatReq, respStream); err != nil {
              log.Errorf(s.Ctx(), "could not send coalescedHeartbeat %v", err)

why is this s.Ctx() and not ctx?
also, %s for non-nil errors


storage/store.go, line 2413 at r4 (raw file):

          // back into HandleRaftRequest has len(...) == 0.
          if err := s.HandleRaftRequest(s.Ctx(), respReq, respStream); err != nil {
              log.Errorf(s.Ctx(), "could not send coalescedHeartbeat %v", err)

why is this s.Ctx() and not ctx?
also, %s for non-nil errors


storage/store.go, line 2971 at r4 (raw file):

func (s *Store) sendQueuedHeartbeatsToNode(
  beats, resps []RaftHeartbeat,
  to roachpb.StoreIdent, accum int) int {

wrap this in accordance with the style guide.


Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 4 times, most recently from 3e677f6 to ef6c024 Compare September 27, 2016 14:50
@rjnn
Copy link
Contributor Author

rjnn commented Sep 27, 2016

Ready for another review. Only failing TestRaftAfterRemoveRace now. Any help debugging that would be appreciated!


Review status: 5 of 10 files reviewed at latest revision, 25 unresolved discussions, some commit checks pending.


storage/replica.go, line 2323 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

wrap this in accordance with the style guide.

Done.

storage/replica.go, line 2325 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

this construction is oddly repetitive. How about:

if enableCoalescedHeartbeats {
  var hbMap map[roachpb.StoreIdent][]RaftHeartbeat
  switch msg.Type {
  case raftpb.MsgHeartbeat:
    hbMap = r.store.coalesced.heartbeats
  case raftpb.MsgHeartbeatResp:
    hbMap = r.store.coalesced.heartbeatsResponses
  default:
    return false
  }
  beat := ...
  toStore := ...
  ...Lock()
  hbMap[toStore] = append(hbMap[toStore], beat)
  ...Unlock()
  return true
}
return false
Much cleaner! Done.

storage/store.go, line 297 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

you should call this coalescedMu

Done.

storage/store.go, line 2383 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

why is this s.Ctx() and not ctx?
also, %s for non-nil errors

Done.

storage/store.go, line 2413 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

why is this s.Ctx() and not ctx?
also, %s for non-nil errors

Done.

storage/store.go, line 2971 at r4 (raw file):

Previously, tamird (Tamir Duberstein) wrote…

wrap this in accordance with the style guide.

Done.

Comments from Reviewable

@tamird
Copy link
Contributor

tamird commented Sep 27, 2016

Reviewed 5 of 5 files at r5.
Review status: all files reviewed at latest revision, 20 unresolved discussions, some commit checks failed.


storage/store.go, line 2383 at r4 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Done.

you fixed only one of the contexts.

Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 2 times, most recently from 1022dda to f4cf613 Compare September 27, 2016 17:04
@bdarnell
Copy link
Contributor

:lgtm:

In the TestRaftRemoveRace failure, the log line W160927 17:16:45.360408 12103 storage/raft_transport.go:440 raft transport stream to node 0 failed: unknown peer 0 looks like the key - you're sending a message with a 0 node ID somewhere. Try making that a panic so you can see where it's coming from.


Reviewed 1 of 10 files at r1, 1 of 9 files at r2, 3 of 8 files at r3, 3 of 5 files at r5, 2 of 2 files at r6.
Review status: all files reviewed at latest revision, 21 unresolved discussions, some commit checks failed.


storage/raft.proto, line 32 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Done.

We might be able to avoid sending even the replica IDs: the recipient could look up the StoreID (which is constant across all heartbeats in a message) in its local copy of the RangeDescriptor to get the ReplicaID. But I'm not sure if that's safe since range descriptors can be stale around replica changes.

storage/replica.go, line 2348 at r6 (raw file):

      }
      toStore := roachpb.StoreIdent{
          ClusterID: r.store.Ident.ClusterID,

We don't really need the ClusterID here, just the StoreID/NodeID pair (and maybe just the StoreID; we could look up the NodeID for the store from gossip later).


storage/store.go, line 2757 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Done.

Don't we want to queue up the new heartbeats before sending? Heartbeats are generated by ticks, so if we send heartbeats before ticking then every heartbeat will be delayed by the tick interval (and responses will likely be delayed by a substantial fraction of the tick interval). If we were ticking synchronously instead of going through the scheduler, we'd want to send all the heartbeats as soon as we finished ticking, so we could send all of them in one batch without delay. The async scheduler makes this tricky.

Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: all files reviewed at latest revision, 12 unresolved discussions, some commit checks failed.


storage/replica.go, line 2332 at r6 (raw file):

      switch msg.Type {
      case raftpb.MsgHeartbeat:
          r.store.coalescedMu.Lock()

I think you can do this locking after the switch statement. Even better to move it down to just before hbMap is accessed.


storage/store.go, line 2353 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

Done.

You're still calling `HandleRaftRequest` recursively. That feels weird to me.

storage/store.go, line 2757 at r2 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Don't we want to queue up the new heartbeats before sending? Heartbeats are generated by ticks, so if we send heartbeats before ticking then every heartbeat will be delayed by the tick interval (and responses will likely be delayed by a substantial fraction of the tick interval). If we were ticking synchronously instead of going through the scheduler, we'd want to send all the heartbeats as soon as we finished ticking, so we could send all of them in one batch without delay. The async scheduler makes this tricky.

When we queue up the ticks they'll be processed asynchronously. There isn't a strong reason for doing it before or after.

Comments from Reviewable

@tamird
Copy link
Contributor

tamird commented Sep 28, 2016

Reviewed 2 of 2 files at r6.
Review status: all files reviewed at latest revision, 11 unresolved discussions, some commit checks failed.


Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 2 times, most recently from ecc01bd to e3f8e85 Compare October 6, 2016 20:05
@bdarnell
Copy link
Contributor

bdarnell commented Oct 9, 2016

Reviewed 1 of 9 files at r8, 1 of 5 files at r10, 1 of 7 files at r12, 1 of 2 files at r13, 1 of 3 files at r14, 5 of 5 files at r15.
Review status: all files reviewed at latest revision, 18 unresolved discussions, some commit checks failed.


Comments from Reviewable

@rjnn rjnn force-pushed the coalesce_simple branch 3 times, most recently from 6c40fe5 to b194fc4 Compare October 10, 2016 15:57
@rjnn rjnn force-pushed the coalesce_simple branch 10 times, most recently from a978f94 to aa2546e Compare October 20, 2016 15:47
@rjnn
Copy link
Contributor Author

rjnn commented Oct 20, 2016

Moved the heartbeat coalescing loop to its own separate timer, which is set to run at 10x the speed of the raft tick loop. This makes heartbeating operate on a faster cadence than quiescing, which ensures that delayed heartbeat responses do not unquiesce ranges.


Review status: 0 of 11 files reviewed at latest revision, 19 unresolved discussions, some commit checks pending.


storage/replica.go, line 2494 at r13 (raw file):

// raft group.
func (r *Replica) reportUnreachable(remoteReplica roachpb.ReplicaID) {
  r.mu.Lock()

This is the offending line that causes a deadlock in (and resulting failure of) TestStopServer. Thanks to @andreimatei for help debugging this. @petermattis, you updated the locking in replica last - do you have an idea as to whats the best lock to hold in this situation? Both r.mu and r.raftMu cause problems ...


Comments from Reviewable

@tamird
Copy link
Contributor

tamird commented Oct 20, 2016

Reviewed 20 of 21 files at r16.
Review status: all files reviewed at latest revision, 18 unresolved discussions, all commit checks successful.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: all files reviewed at latest revision, 25 unresolved discussions, all commit checks successful.


pkg/storage/replica.go, line 2390 at r16 (raw file):

      hbMap = r.store.coalescedMu.heartbeats
  case raftpb.MsgHeartbeatResp:
      // TODO(peter): Coalescing heartbeat responses has a bad interaction with

This comment seems out of place given that you're not doing anything to address it here. And is this even still a TODO?


pkg/storage/store.go, line 523 at r16 (raw file):

  // are queued and then sent as a single coalesced heartbeat; it is a
  // fraction of the RaftTickInterval so that heartbeats don't get delayed by
  // an entire tick.

Need additional commentary here or somewhere explaining why delaying the heartbeat for an entire tick is problematic.


pkg/storage/store.go, line 664 at r16 (raw file):

  }
  if sc.CoalescedHeartbeatsInterval == 0 {
      sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 10

How did you arrive at / 10? I would have guessed that / 2 would be sufficient.


pkg/storage/store.go, line 2456 at r16 (raw file):

  }
  if log.V(4) {
      log.Infof(ctx, "[s%d] uncoalescing %d beats of type %v: %+v", s.Ident.StoreID, len(beats), msgT, beats)

Doesn't ctx already contain s.Ident.StoreID?


pkg/storage/store.go, line 2483 at r16 (raw file):

      }
      if log.V(4) {
          log.Infof(ctx, "[s%d] uncoalesced beat: %+v", s.Ident.StoreID, beatReqs[i])

Ditto.


pkg/storage/store.go, line 2487 at r16 (raw file):

      if err := s.HandleRaftUncoalescedRequest(ctx, &beatReqs[i], respStream); err != nil {
          log.Errorf(ctx, "[s%d] could not handle uncoalesced heartbeat %s", s.Ident.StoreID, err)

Ditto.


pkg/storage/store.go, line 3078 at r16 (raw file):

  })

  s.raftTickLoop()

Should call startCoalescedHeartbeatsLoop here so that we don't bother starting that worker unless we're doing raft processing.


Comments from Reviewable

@rjnn
Copy link
Contributor Author

rjnn commented Oct 20, 2016

Review status: 9 of 11 files reviewed at latest revision, 25 unresolved discussions.


pkg/storage/replica.go, line 2390 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

This comment seems out of place given that you're not doing anything to address it here. And is this even still a TODO?

Took the substance of the comment and moved it to the declaration of `CoalescedHeartbeatsInterval`. It is no longer a TODO

pkg/storage/store.go, line 523 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Need additional commentary here or somewhere explaining why delaying the heartbeat for an entire tick is problematic.

Moved the substance of the old TODO here, since that contains a succinct explanation.

pkg/storage/store.go, line 664 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

How did you arrive at / 10? I would have guessed that / 2 would be sufficient.

I picked `/ 10` arbitrarily. I think it depends on network delay, and I wanted to start it off conservatively. Changed to `/ 2` for now. Do we have recommendations on what `RaftTickInterval` should be w.r.t. the expected latency between two machines?

pkg/storage/store.go, line 2456 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Doesn't ctx already contain s.Ident.StoreID?

Done.

pkg/storage/store.go, line 2483 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Ditto.

Done.

pkg/storage/store.go, line 2487 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Ditto.

Done.

pkg/storage/store.go, line 3078 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Should call startCoalescedHeartbeatsLoop here so that we don't bother starting that worker unless we're doing raft processing.

Done.

Comments from Reviewable

@petermattis
Copy link
Collaborator

:lgtm:


Review status: 9 of 11 files reviewed at latest revision, 21 unresolved discussions.


pkg/storage/store.go, line 664 at r16 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I picked / 10 arbitrarily. I think it depends on network delay, and I wanted to start it off conservatively. Changed to / 2 for now. Do we have recommendations on what RaftTickInterval should be w.r.t. the expected latency between two machines?

No, we don't have such a recommendation right now. The current `RaftTickInterval` setting will likely need to be changed.

pkg/storage/store.go, line 524 at r17 (raw file):

  // fraction of the RaftTickInterval so that heartbeats don't get delayed by
  // an entire tick. Delaying coalescing heartbeat responses has a bad
  //interaction with quiescence because the coalesced (delayed) heartbeat

s,//interaction,// interaction,g


pkg/storage/store.go, line 534 at r17 (raw file):

  //                                        follower sends MsgHeartbeatResp
  //      leader receives MsgHeartbeatResp
  // T+3: leader sends quiesce message

Thus we want to make sure that heartbeats are responded to faster than the quiesce cadence.


Comments from Reviewable

Coalesce heartbeats and heartbeat responses bound for the same store
into a single proto. Introduce a new environment flag (default: true)
COCKROACH_ENABLE_COALESCED_HEARTBEATS to turn this feature off. Added
metrics and a graph in the admin UI to track the number of queued
heartbeats waiting to be coalesced. The frequency of heartbeat
coalescing is controlled by a new timer, which is set by default to
run 10x per raft tick.
@rjnn
Copy link
Contributor Author

rjnn commented Oct 20, 2016

Review status: 9 of 11 files reviewed at latest revision, 21 unresolved discussions.


pkg/storage/store.go, line 524 at r17 (raw file):

Previously, petermattis (Peter Mattis) wrote…

s,//interaction,// interaction,g

Done.

pkg/storage/store.go, line 534 at r17 (raw file):

Previously, petermattis (Peter Mattis) wrote…

Thus we want to make sure that heartbeats are responded to faster than the quiesce cadence.

Done.

Comments from Reviewable

@rjnn rjnn merged commit 598df37 into cockroachdb:master Oct 20, 2016
@rjnn rjnn deleted the coalesce_simple branch October 20, 2016 17:23
@bdarnell
Copy link
Contributor

Review status: 9 of 11 files reviewed at latest revision, 19 unresolved discussions, all commit checks successful.


pkg/storage/store.go, line 664 at r16 (raw file):

Previously, petermattis (Peter Mattis) wrote…

No, we don't have such a recommendation right now. The current RaftTickInterval setting will likely need to be changed.

I thought I remember reading a recommendation of 150-300ms for the heartbeat interval in the raft paper, but I can't find that now and don't remember what kind of network it was assuming.

The raft thesis recommends an election timeout of 10-20x the network RTT, which probably implies heartbeat intervals of 3-5x RTT. However, we may want to recommend larger intervals since we run many instances of raft.


Comments from Reviewable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
S-1-stability Severe stability issues that can be fixed by upgrading, but usually don’t resolve by restarting
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants