Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: tune raft.Config.{MaxSizePerMsg,MaxInflightMsgs} #10929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,19 +2267,27 @@ func (r *Replica) maybeAbandonSnapshot(ctx context.Context) {
}
}

type handleRaftReadyStats struct {
processed int
}

// handleRaftReady processes a raft.Ready containing entries and messages that
// are ready to read, be saved to stable storage, committed or sent to other
// peers. It takes a non-empty IncomingSnapshot to indicate that it is
// about to process a snapshot.
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error {
func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) (handleRaftReadyStats, error) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.handleRaftReadyRaftMuLocked(inSnap)
}

// handleRaftReadyLocked is the same as handleRaftReady but requires that the
// replica's raftMu be held.
func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
func (r *Replica) handleRaftReadyRaftMuLocked(
inSnap IncomingSnapshot,
) (handleRaftReadyStats, error) {
var stats handleRaftReadyStats

ctx := r.AnnotateCtx(context.TODO())
var hasReady bool
var rd raft.Ready
Expand All @@ -2296,11 +2304,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
})
r.mu.Unlock()
if err != nil {
return err
return stats, err
}

if !hasReady {
return nil
return stats, nil
}

logRaftReady(ctx, rd)
Expand All @@ -2326,7 +2334,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
return errors.Wrap(err, "invalid snapshot id")
return stats, errors.Wrap(err, "invalid snapshot id")
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
Expand All @@ -2336,7 +2344,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}

if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState); err != nil {
return err
return stats, err
}

// handleRaftReady is called under the processRaftMu lock, so it is
Expand All @@ -2353,11 +2361,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
}
return nil
}(); err != nil {
return err
return stats, err
}

if lastIndex, err = loadLastIndex(ctx, r.store.Engine(), r.RangeID); err != nil {
return err
return stats, err
}
// We refresh pending commands after applying a snapshot because this
// replica may have been temporarily partitioned from the Raft group and
Expand All @@ -2381,16 +2389,16 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
// last index.
var err error
if lastIndex, raftLogSize, err = r.append(ctx, writer, lastIndex, raftLogSize, rd.Entries); err != nil {
return err
return stats, err
}
}
if !raft.IsEmptyHardState(rd.HardState) {
if err := setHardState(ctx, writer, r.RangeID, rd.HardState); err != nil {
return err
return stats, err
}
}
if err := batch.Commit(); err != nil {
return err
return stats, err
}

// Update protected state (last index, raft log size and raft leader
Expand Down Expand Up @@ -2433,38 +2441,40 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
var encodedCommand []byte
commandID, encodedCommand = DecodeRaftCommand(e.Data)
if err := command.Unmarshal(encodedCommand); err != nil {
return err
return stats, err
}
}

// Discard errors from processRaftCommand. The error has been sent
// to the client that originated it, where it will be handled.
_ = r.processRaftCommand(ctx, commandID, e.Index, command)
stats.processed++

case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
return err
return stats, err
}
var ccCtx ConfChangeContext
if err := ccCtx.Unmarshal(cc.Context); err != nil {
return err
return stats, err
}
var command storagebase.RaftCommand
if err := command.Unmarshal(ccCtx.Payload); err != nil {
return err
return stats, err
}
if pErr := r.processRaftCommand(
ctx, storagebase.CmdIDKey(ccCtx.CommandID), e.Index, command,
); pErr != nil {
// If processRaftCommand failed, tell raft that the config change was aborted.
cc = raftpb.ConfChange{}
}
stats.processed++
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ApplyConfChange(cc)
return true, nil
}); err != nil {
return err
return stats, err
}
default:
log.Fatalf(ctx, "unexpected Raft entry: %v", e)
Expand All @@ -2479,7 +2489,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
// TODO(bdarnell): need to check replica id and not Advance if it
// has changed. Or do we need more locking to guarantee that replica
// ID cannot change during handleRaftReady?
return r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
return stats, r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.Advance(rd)
return true, nil
})
Expand Down
38 changes: 26 additions & 12 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
}
}

var (
raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024)
raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 4)
)

func newRaftConfig(
strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger,
) *raft.Config {
Expand All @@ -168,9 +173,17 @@ func newRaftConfig(
PreVote: enablePreVote,
CheckQuorum: !enablePreVote,

// TODO(bdarnell): make these configurable; evaluate defaults.
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
// MaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
MaxSizePerMsg: uint64(raftMaxSizePerMsg),
// MaxInflightMsgs controls how many "inflight" messages Raft will send to
// a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and MaxSizePerMsg. The current
// settings provide for up to 64 KB of raft log to be sent without
// acknowledgement. With an average entry size of 1 KB that translates to
// ~64 commands that might be executed in the handling of a single
// raft.Ready operation.
MaxInflightMsgs: raftMaxInflightMsgs,
}
}

Expand Down Expand Up @@ -2961,7 +2974,7 @@ func (s *Store) processRaftRequest(
removePlaceholder = false
} else {
// Force the replica to deal with this snapshot right now.
if err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
if _, err := r.handleRaftReadyRaftMuLocked(inSnap); err != nil {
// mimic the behavior in processRaft.
panic(err)
}
Expand Down Expand Up @@ -3209,19 +3222,20 @@ func (s *Store) processReady(rangeID roachpb.RangeID) {
s.mu.Unlock()

if ok {
if err := r.handleRaftReady(IncomingSnapshot{}); err != nil {
stats, err := r.handleRaftReady(IncomingSnapshot{})
if err != nil {
panic(err) // TODO(bdarnell)
}
elapsed := timeutil.Since(start)
s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds())
// If Raft processing took longer than 10x the raft tick interval something
// bad is going on. Such long processing time means we'll have starved
// local replicas of ticks and remote replicas will likely start
// campaigning.
var warnDuration = 10 * s.cfg.RaftTickInterval
if elapsed >= warnDuration {
// Warn if Raft processing took too long. We use the same duration as we
// use for warning about excessive raft mutex lock hold times. Long
// processing time means we'll have starved local replicas of ticks and
// remote replicas will likely start campaigning.
if elapsed >= defaultReplicaRaftMuWarnThreshold {
ctx := r.AnnotateCtx(context.TODO())
log.Warningf(ctx, "handle raft ready: %.1fs", elapsed.Seconds())
log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]",
elapsed.Seconds(), stats.processed)
}
if !r.IsInitialized() {
// Only an uninitialized replica can have a placeholder since, by
Expand Down