Skip to content

Commit

Permalink
storage: tune raft.Config.{MaxSizePerMsg,MaxInflightMsgs}
Browse files Browse the repository at this point in the history
The previous settings allowed up to 256 MB of Raft log entries to be
inflight to a follower, resulting in a single Replica.handleRaftReady
call processing thousands or 10s of thousands of commands.

Log the number of commands processed when Replica.handleRaftReady takes
too long.

Fixes #10917
  • Loading branch information
petermattis committed Nov 22, 2016
1 parent ffe9a33 commit 89e8fe3
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 29 deletions.
44 changes: 27 additions & 17 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,19 +2267,27 @@ func (r *Replica) maybeAbandonSnapshot(ctx context.Context) {
}
}

type handleRaftReadyStats struct {
processed int
}

// handleRaftReady processes a raft.Ready containing entries and messages that
// are ready to read, be saved to stable storage, committed or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error {
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) (handleRaftReadyStats, error) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.handleRaftReadyRaftMuLocked(inSnap)
}

// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica's raftMu be held.
func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
func (r *Replica) handleRaftReadyRaftMuLocked(
inSnap IncomingSnapshot,
) (handleRaftReadyStats, error) {
var stats handleRaftReadyStats

ctx := r.AnnotateCtx(context.TODO())
var hasReady bool
var rd raft.Ready
Expand All @@ -2296,11 +2304,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
})
r.mu.Unlock()
if err != nil {
return err
return stats, err
}

if !hasReady {
return nil
return stats, nil
}

logRaftReady(ctx, rd)
Expand All @@ -2326,7 +2334,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
return errors.Wrap(err, "invalid snapshot id")
return stats, errors.Wrap(err, "invalid snapshot id")
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
Expand All @@ -2336,7 +2344,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}

if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil {
return err
return stats, err
}

// handleRaftReady is called under the processRaftMu lock, so it is
Expand All @@ -2353,11 +2361,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}
return nil
}(); err != nil {
return err
return stats, err
}

if lastIndex, err = loadLastIndex(ctx, r.store.Engine(), r.RangeID); err != nil {
return err
return stats, err
}
// We refresh pending commands after applying a snapshot because this
// replica may have been temporarily partitioned from the Raft group and
Expand All @@ -2381,16 +2389,16 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
// last index.
var err error
if lastIndex, raftLogSize, err = r.append(ctx, writer, lastIndex, raftLogSize, rd.Entries); err != nil {
return err
return stats, err
}
}
if !raft.IsEmptyHardState(rd.HardState) {
if err := setHardState(ctx, writer, r.RangeID, rd.HardState); err != nil {
return err
return stats, err
}
}
if err := batch.Commit(); err != nil {
return err
return stats, err
}

// Update protected state (last index, raft log size and raft leader
Expand Down Expand Up @@ -2433,38 +2441,40 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
var encodedCommand []byte
commandID, encodedCommand = DecodeRaftCommand(e.Data)
if err := command.Unmarshal(encodedCommand); err != nil {
return err
return stats, err
}
}

// Discard errors from processRaftCommand. The error has been sent
// to the client that originated it, where it will be handled.
_ = r.processRaftCommand(ctx, commandID, e.Index, command)
stats.processed++

case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
return err
return stats, err
}
var ccCtx ConfChangeContext
if err := ccCtx.Unmarshal(cc.Context); err != nil {
return err
return stats, err
}
var command storagebase.RaftCommand
if err := command.Unmarshal(ccCtx.Payload); err != nil {
return err
return stats, err
}
if pErr := r.processRaftCommand(
ctx, storagebase.CmdIDKey(ccCtx.CommandID), e.Index, command,
); pErr != nil {
// If processRaftCommand failed, tell raft that the config change was aborted.
cc = raftpb.ConfChange{}
}
stats.processed++
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ApplyConfChange(cc)
return true, nil
}); err != nil {
return err
return stats, err
}
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
Expand All @@ -2479,7 +2489,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
// TODO(bdarnell): need to check replica id and not Advance if it
// has changed. Or do we need more locking to guarantee that replica
// ID cannot change during handleRaftReady?
return r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
return stats, r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.Advance(rd)
return true, nil
})
Expand Down
38 changes: 26 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
}
}

var (
raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024)
raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 4)
)

func newRaftConfig(
strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger,
) *raft.Config {
Expand All @@ -168,9 +173,17 @@ func newRaftConfig(
PreVote: enablePreVote,
CheckQuorum: !enablePreVote,

// TODO(bdarnell): make these configurable; evaluate defaults.
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
// MaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
MaxSizePerMsg: uint64(raftMaxSizePerMsg),
// MaxInflightMsgs controls how many "inflight" messages Raft will send to
// a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and MaxSizePerMsg. The current
// settings provide for up to 64 KB of raft log to be sent without
// acknowledgement. With an average entry size of 1 KB that translates to
// ~64 commands that might be executed in the handling of a single
// raft.Ready operation.
MaxInflightMsgs: raftMaxInflightMsgs,
}
}

Expand Down Expand Up @@ -2961,7 +2974,7 @@ func (s *Store) processRaftRequest(
removePlaceholder = false
} else {
// Force the replica to deal with this snapshot right now.
if err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
if _, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
// mimic the behavior in processRaft.
panic(err)
}
Expand Down Expand Up @@ -3209,19 +3222,20 @@ func (s *Store) processReady(rangeID roachpb.RangeID) {
s.mu.Unlock()

if ok {
if err := r.handleRaftReady(IncomingSnapshot{}); err != nil {
stats, err := r.handleRaftReady(IncomingSnapshot{})
if err != nil {
panic(err) // TODO(bdarnell)
}
elapsed := timeutil.Since(start)
s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds())
// If Raft processing took longer than 10x the raft tick interval something
// bad is going on. Such long processing time means we'll have starved
// local replicas of ticks and remote replicas will likely start
// campaigning.
var warnDuration = 10 * s.cfg.RaftTickInterval
if elapsed >= warnDuration {
// Warn if Raft processing took too long. We use the same duration as we
// use for warning about excessive raft mutex lock hold times. Long
// processing time means we'll have starved local replicas of ticks and
// remote replicas will likely start campaigning.
if elapsed >= defaultReplicaRaftMuWarnThreshold {
ctx := r.AnnotateCtx(context.TODO())
log.Warningf(ctx, "handle raft ready: %.1fs", elapsed.Seconds())
log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]",
elapsed.Seconds(), stats.processed)
}
if !r.IsInitialized() {
// Only an uninitialized replica can have a placeholder since, by
Expand Down

0 comments on commit 89e8fe3

Please sign in to comment.