From e4003e160982cb3f3f31eaedb15b80a87d163a48 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 15 Oct 2018 17:50:57 -0400 Subject: [PATCH] storage: move Raft log configurations into base.RaftConfig This centralizes all Raft configuration and makes it easier to configure in tests. Release note: None --- pkg/base/config.go | 50 ++++++++++++++++++++++++++-- pkg/storage/raft_log_queue.go | 8 ++--- pkg/storage/raft_transport.go | 4 ++- pkg/storage/replica.go | 28 ++++++++-------- pkg/storage/replica_command.go | 9 ++++- pkg/storage/replica_sideload_test.go | 2 ++ pkg/storage/replica_test.go | 4 +-- pkg/storage/store.go | 31 +++++------------ pkg/storage/store_snapshot.go | 12 +++++-- pkg/storage/store_snapshot_test.go | 3 +- pkg/storage/store_test.go | 10 +++--- 11 files changed, 105 insertions(+), 56 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index 24af703d1cc7..ffe1a9108d54 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -90,8 +90,27 @@ const ( DefaultTableDescriptorLeaseRenewalTimeout = time.Minute ) -var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( - "COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15) +var ( + // defaultRaftElectionTimeoutTicks specifies the number of Raft Tick + // invocations that must pass between elections. + defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15) + + // defaultRaftLogMaxSize specifies the upper bound that a single Range's + // Raft log is limited to. + defaultRaftLogMaxSize = envutil.EnvOrDefaultInt64( + "COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */) + + // defaultRaftMaxSizePerMsg specifies the maximum number of Raft log entries + // that a leader will send to followers in a single MsgApp. + defaultRaftMaxSizePerMsg = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024) + + // defaultRaftMaxSizePerMsg specifies how many "inflight" messages a leader + // will send to a follower without hearing a response. + defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) +) type lazyHTTPClient struct { once sync.Once @@ -421,6 +440,24 @@ type RaftConfig struct { // RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader // lease active duration should be of the raft election timeout. RangeLeaseRaftElectionTimeoutMultiplier float64 + + // RaftLogMaxSize controls how large a single Range's Raft log can grow. + // When a Range's Raft log grows above this size, the Range will begin + // performing log truncations. + RaftLogMaxSize int64 + + // RaftMaxSizePerMsg controls how many Raft log entries the leader will send to + // followers in a single MsgApp. + RaftMaxSizePerMsg uint64 + + // RaftMaxInflightMsgs controls how many "inflight" messages Raft will send + // to a follower without hearing a response. The total number of Raft log + // entries is a combination of this setting and RaftMaxSizePerMsg. The + // current default settings provide for up to 1 MB of raft log to be sent + // without acknowledgement. With an average entry size of 1 KB that + // translates to ~1024 commands that might be executed in the handling of a + // single raft.Ready operation. + RaftMaxInflightMsgs int } // SetDefaults initializes unset fields. @@ -434,6 +471,15 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 { cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier } + if cfg.RaftLogMaxSize == 0 { + cfg.RaftLogMaxSize = defaultRaftLogMaxSize + } + if cfg.RaftMaxSizePerMsg == 0 { + cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg) + } + if cfg.RaftMaxInflightMsgs == 0 { + cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs + } } // RaftElectionTimeout returns the raft election timeout, as computed from the diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 2178dde8065b..52726f819d2a 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -49,9 +48,6 @@ const ( raftLogQueueConcurrency = 4 ) -// raftLogMaxSize limits the maximum size of the Raft log. -var raftLogMaxSize = envutil.EnvOrDefaultInt64("COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */) - // raftLogQueue manages a queue of replicas slated to have their raft logs // truncated by removing unneeded entries. type raftLogQueue struct { @@ -118,8 +114,8 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int if targetSize > *r.mu.zone.RangeMaxBytes { targetSize = *r.mu.zone.RangeMaxBytes } - if targetSize > raftLogMaxSize { - targetSize = raftLogMaxSize + if targetSize > r.store.cfg.RaftLogMaxSize { + targetSize = r.store.cfg.RaftLogMaxSize } firstIndex, err := r.raftFirstIndexLocked() pendingSnapshotIndex := r.mu.pendingSnapshotIndex diff --git a/pkg/storage/raft_transport.go b/pkg/storage/raft_transport.go index 96d7f6573c30..8d858e47b5b5 100644 --- a/pkg/storage/raft_transport.go +++ b/pkg/storage/raft_transport.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/raft/raftpb" "google.golang.org/grpc" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -615,6 +616,7 @@ func (t *RaftTransport) startProcessNewQueue( // for closing the OutgoingSnapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, + raftCfg *base.RaftConfig, storePool *StorePool, header SnapshotRequest_Header, snap *OutgoingSnapshot, @@ -640,5 +642,5 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %s", err) } }() - return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent) + return sendSnapshot(ctx, raftCfg, t.st, stream, storePool, header, snap, newBatch, sent) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index b0a2acac555d..b8383c9d8385 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -93,19 +93,8 @@ const ( defaultReplicaRaftMuWarnThreshold = 500 * time.Millisecond ) -var raftLogTooLargeSize = 4 * raftLogMaxSize - var testingDisableQuiescence = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_QUIESCENCE", false) -// TODO(irfansharif, peter): What's a good default? Too low and everything comes -// to a grinding halt, too high and we're not really throttling anything -// (we'll still generate snapshots). Should it be adjusted dynamically? -// -// We set the defaultProposalQuota to be less than raftLogMaxSize, in doing so -// we ensure all replicas have sufficiently up to date logs so that when the -// log gets truncated, the followers do not need non-preemptive snapshots. -var defaultProposalQuota = raftLogMaxSize / 4 - var syncRaftLog = settings.RegisterBoolSetting( "kv.raft_log.synchronize", "set to true to synchronize on Raft log writes to persistent storage ('false' risks data loss)", @@ -1129,12 +1118,22 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen) } + // We set the defaultProposalQuota to be less than RaftLogMaxSize, + // in doing so we ensure all replicas have sufficiently up to date + // logs so that when the log gets truncated, the followers do not + // need non-preemptive snapshots. Changing this deserves care. Too + // low and everything comes to a grinding halt, too high and we're + // not really throttling anything (we'll still generate snapshots). + // + // TODO(nvanbenschoten): clean this up in later commits. + proposalQuota := r.store.cfg.RaftLogMaxSize / 4 + // Raft may propose commands itself (specifically the empty // commands when leadership changes), and these commands don't go // through the code paths where we acquire quota from the pool. To // offset this we reset the quota pool whenever leadership changes // hands. - r.mu.proposalQuota = newQuotaPool(defaultProposalQuota) + r.mu.proposalQuota = newQuotaPool(proposalQuota) r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time) r.mu.commandSizes = make(map[storagebase.CmdIDKey]int) } else if r.mu.proposalQuota != nil { @@ -6967,6 +6966,7 @@ func (r *Replica) Metrics( return calcReplicaMetrics( ctx, now, + &r.store.cfg.RaftConfig, zone, livenessMap, availableNodes, @@ -6994,6 +6994,7 @@ func HasRaftLeader(raftStatus *raft.Status) bool { func calcReplicaMetrics( ctx context.Context, now hlc.Timestamp, + raftCfg *base.RaftConfig, zone *config.ZoneConfig, livenessMap IsLiveMap, availableNodes int, @@ -7033,7 +7034,8 @@ func calcReplicaMetrics( m.CmdQMetricsLocal = cmdQMetricsLocal m.CmdQMetricsGlobal = cmdQMetricsGlobal - m.RaftLogTooLarge = raftLogSize > raftLogTooLargeSize + const raftLogTooLargeMultiple = 4 + m.RaftLogTooLarge = raftLogSize > (raftLogTooLargeMultiple * raftCfg.RaftLogMaxSize) return m } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index f8ab63162e20..e555c31e869d 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -942,7 +942,14 @@ func (r *Replica) sendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } if err := r.store.cfg.Transport.SendSnapshot( - ctx, r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch, sent); err != nil { + ctx, + &r.store.cfg.RaftConfig, + r.store.allocator.storePool, + req, + snap, + r.store.Engine().NewBatch, + sent, + ); err != nil { return &snapshotError{err} } return nil diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index c8d06846f0f8..b31d5bc9483c 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -783,6 +783,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} if err := sendSnapshot( ctx, + &tc.store.cfg.RaftConfig, tc.store.cfg.Settings, mockSender, &fakeStorePool{}, @@ -904,6 +905,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} err = sendSnapshot( ctx, + &tc.store.cfg.RaftConfig, tc.store.cfg.Settings, mockSender, &fakeStorePool{}, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9e82c0ef8f78..a017dc9592b3 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -9272,7 +9272,7 @@ func TestReplicaMetrics(t *testing.T) { Underreplicated: false, }}, // The leader of a 1-replica range is up and raft log is too large. - {1, 1, desc(1), status(1, progress(2)), live(1), 5 * raftLogMaxSize, + {1, 1, desc(1), status(1, progress(2)), live(1), 5 * cfg.RaftLogMaxSize, ReplicaMetrics{ Leader: true, RangeCounter: true, @@ -9294,7 +9294,7 @@ func TestReplicaMetrics(t *testing.T) { c.expected.Quiescent = i%2 == 0 c.expected.Ticking = !c.expected.Quiescent metrics := calcReplicaMetrics( - context.Background(), hlc.Timestamp{}, &zoneConfig, + context.Background(), hlc.Timestamp{}, &cfg.RaftConfig, &zoneConfig, c.liveness, 0, &c.desc, c.raftStatus, storagepb.LeaseStatus{}, c.storeID, c.expected.Quiescent, c.expected.Ticking, CommandQueueMetrics{}, CommandQueueMetrics{}, c.raftLogSize) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ef9ebdeba1a3..2a7108ebe3e0 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -163,35 +163,20 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { return sc } -var ( - raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024) - raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) -) - func newRaftConfig( strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger, ) *raft.Config { return &raft.Config{ - ID: id, - Applied: appliedIndex, - ElectionTick: storeCfg.RaftElectionTimeoutTicks, - HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, - Storage: strg, - Logger: logger, + ID: id, + Applied: appliedIndex, + ElectionTick: storeCfg.RaftElectionTimeoutTicks, + HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, + MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg, + MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs, + Storage: strg, + Logger: logger, PreVote: true, - - // MaxSizePerMsg controls how many Raft log entries the leader will send to - // followers in a single MsgApp. - MaxSizePerMsg: uint64(raftMaxSizePerMsg), - // MaxInflightMsgs controls how many "inflight" messages Raft will send to - // a follower without hearing a response. The total number of Raft log - // entries is a combination of this setting and MaxSizePerMsg. The current - // settings provide for up to 1 MB of raft log to be sent without - // acknowledgement. With an average entry size of 1 KB that translates to - // ~1024 commands that might be executed in the handling of a single - // raft.Ready operation. - MaxInflightMsgs: raftMaxInflightMsgs, } } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 26d076d81135..5bf1da8781ca 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/raft/raftpb" "golang.org/x/time/rate" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -98,7 +99,8 @@ func assertStrategy( // kvBatchSnapshotStrategy is an implementation of snapshotStrategy that streams // batches of KV pairs in the BatchRepr format. type kvBatchSnapshotStrategy struct { - status string + raftCfg *base.RaftConfig + status string // Fields used when sending snapshots. batchSize int64 @@ -228,7 +230,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( if err == nil { logEntries = append(logEntries, bytes) raftLogBytes += int64(len(bytes)) - if snap.snapType == snapTypePreemptive && raftLogBytes > 4*raftLogMaxSize { + if snap.snapType == snapTypePreemptive && raftLogBytes > 4*kvSS.raftCfg.RaftLogMaxSize { // If the raft log is too large, abort the snapshot instead of // potentially running out of memory. However, if this is a // raft-initiated snapshot (instead of a preemptive one), we @@ -492,7 +494,9 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: - ss = &kvBatchSnapshotStrategy{} + ss = &kvBatchSnapshotStrategy{ + raftCfg: &s.cfg.RaftConfig, + } default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", @@ -568,6 +572,7 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string { // sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream. func sendSnapshot( ctx context.Context, + raftCfg *base.RaftConfig, st *cluster.Settings, stream outgoingSnapshotStream, storePool SnapshotStorePool, @@ -635,6 +640,7 @@ func sendSnapshot( switch header.Strategy { case SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{ + raftCfg: raftCfg, batchSize: batchSize, limiter: limiter, newBatch: newBatch, diff --git a/pkg/storage/store_snapshot_test.go b/pkg/storage/store_snapshot_test.go index c6a951349385..0810bc9034dd 100644 --- a/pkg/storage/store_snapshot_test.go +++ b/pkg/storage/store_snapshot_test.go @@ -45,7 +45,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) { var bytesWritten int64 blob := []byte(strings.Repeat("a", 1024*1024)) - for i := 0; bytesWritten < 5*raftLogMaxSize; i++ { + for i := 0; bytesWritten < 5*store.cfg.RaftLogMaxSize; i++ { pArgs := putArgs(roachpb.Key("a"), blob) _, pErr := client.SendWrappedWith(ctx, store, roachpb.Header{RangeID: 1}, &pArgs) if pErr != nil { @@ -65,6 +65,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) { defer snap.Close() ss := kvBatchSnapshotStrategy{ + raftCfg: &store.cfg.RaftConfig, limiter: rate.NewLimiter(1<<10, 1), newBatch: eng.NewBatch, } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 810fd6b29c8b..343f13170ef5 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -3049,6 +3049,8 @@ func TestSendSnapshotThrottling(t *testing.T) { defer e.Close() ctx := context.Background() + var cfg base.RaftConfig + cfg.SetDefaults() st := cluster.MakeTestingClusterSettings() header := SnapshotRequest_Header{ @@ -3064,7 +3066,7 @@ func TestSendSnapshotThrottling(t *testing.T) { sp := &fakeStorePool{} expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) } @@ -3080,7 +3082,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_DECLINED, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.declinedThrottles != 1 { t.Fatalf("expected 1 declined throttle, but found %d", sp.declinedThrottles) } @@ -3097,7 +3099,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_DECLINED, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) } @@ -3113,7 +3115,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_ERROR, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) }