From faff1082a88db488633ebae7762f9497d27fc2da Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 19 Jun 2019 20:26:46 -0400 Subject: [PATCH] storage: remove commandSizes map, use quota size on ProposalData This commit addresses a TODO added in the previous commit to remove the commandSizes map in favor of using the quota size field tracked directly on the ProposalData object. This allows us to avoid an extra exclusive lock on each applied command, reducing below Raft lock contention with above Raft processing (which often holds a shared lock). Release note: None --- pkg/storage/client_raft_test.go | 6 --- pkg/storage/helpers_test.go | 8 ---- pkg/storage/replica.go | 20 ++++----- pkg/storage/replica_proposal.go | 4 +- pkg/storage/replica_proposal_buf.go | 3 -- pkg/storage/replica_proposal_quota.go | 11 +---- pkg/storage/replica_raft.go | 58 +++++++++------------------ 7 files changed, 29 insertions(+), 81 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 0010f2542943..061aea9d9a05 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -2171,9 +2171,6 @@ func TestQuotaPool(t *testing.T) { if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 { return errors.Errorf("expected 1 queued quota release, found: %d", qLen) } - if cLen := leaderRepl.CommandSizesLen(); cLen != 0 { - return errors.Errorf("expected zero-length command sizes map, found %d", cLen) - } return nil }) @@ -2196,9 +2193,6 @@ func TestQuotaPool(t *testing.T) { if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 0 { return errors.Errorf("expected no queued quota releases, found: %d", qLen) } - if cLen := leaderRepl.CommandSizesLen(); cLen != 0 { - return errors.Errorf("expected zero-length command sizes map, found %d", cLen) - } return nil }) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index bed1bb726ec2..bddaac9c63c2 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rditer" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -273,7 +272,6 @@ func (r *Replica) InitQuotaPool(quota int64) { } r.mu.proposalQuota = newQuotaPool(quota) r.mu.quotaReleaseQueue = nil - r.mu.commandSizes = make(map[storagebase.CmdIDKey]int) } // QuotaAvailable returns the quota available in the replica's quota pool. Only @@ -296,12 +294,6 @@ func (r *Replica) IsFollowerActive(ctx context.Context, followerID roachpb.Repli return r.mu.lastUpdateTimes.isFollowerActive(ctx, followerID, timeutil.Now()) } -func (r *Replica) CommandSizesLen() int { - r.mu.Lock() - defer r.mu.Unlock() - return len(r.mu.commandSizes) -} - // GetTSCacheHighWater returns the high water mark of the replica's timestamp // cache. func (r *Replica) GetTSCacheHighWater() hlc.Timestamp { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index fbdc8164b013..a15aac8168fb 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -398,16 +398,11 @@ type Replica struct { proposalQuotaBaseIndex uint64 - // For command size based allocations we keep track of the sizes of all - // in-flight commands. - commandSizes map[storagebase.CmdIDKey]int - - // Once the leader observes a proposal come 'out of Raft', we consult - // the 'commandSizes' map to determine the size of the associated - // command and add it to a queue of quotas we have yet to release back - // to the quota pool. We only do so when all replicas have persisted - // the corresponding entry into their logs. - quotaReleaseQueue []int + // Once the leader observes a proposal come 'out of Raft', we add the + // size of the associated command to a queue of quotas we have yet to + // release back to the quota pool. We only do so when all replicas have + // persisted the corresponding entry into their logs. + quotaReleaseQueue []int64 // Counts calls to Replica.tick() ticks int @@ -588,9 +583,8 @@ func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // NB: We may be double free-ing here in cases where proposals are // duplicated. To counter this our quota pool is capped at the initial // quota size. - if cmdSize, ok := r.mu.commandSizes[p.idKey]; ok { - r.mu.proposalQuota.add(int64(cmdSize)) - delete(r.mu.commandSizes, p.idKey) + if r.mu.proposalQuota != nil { + r.mu.proposalQuota.add(p.quotaSize) } } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 859f49da8a93..bac303278ff4 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -78,9 +78,7 @@ type ProposalData struct { // quotaSize is the encoded size of command that was used to acquire // proposal quota. command.Size can change slightly as the object is // mutated, so it's safer to record the exact value used here. - // TODO(nvanbenschoten): we're already tracking this here, so why do - // we need the separate commandSizes map? Let's get rid of it. - quotaSize int + quotaSize int64 // tmpFooter is used to avoid an allocation. tmpFooter storagepb.RaftCommandFooter diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index d5a1fc166fa7..f77fbf043d0a 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -599,7 +599,4 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { // decide if/when to re-propose it. p.proposedAtTicks = rp.mu.ticks rp.mu.proposals[p.idKey] = p - if rp.mu.commandSizes != nil { - rp.mu.commandSizes[p.idKey] = p.quotaSize - } } diff --git a/pkg/storage/replica_proposal_quota.go b/pkg/storage/replica_proposal_quota.go index aeb94a1dde77..4272bdbea534 100644 --- a/pkg/storage/replica_proposal_quota.go +++ b/pkg/storage/replica_proposal_quota.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/etcd/raft" @@ -99,7 +98,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.proposalQuota = nil r.mu.lastUpdateTimes = nil r.mu.quotaReleaseQueue = nil - r.mu.commandSizes = nil return } @@ -114,9 +112,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( if releaseQueueLen := len(r.mu.quotaReleaseQueue); releaseQueueLen != 0 { log.Fatalf(ctx, "len(r.mu.quotaReleaseQueue) = %d, expected 0", releaseQueueLen) } - if commandSizesLen := len(r.mu.commandSizes); commandSizesLen != 0 { - log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen) - } // Raft may propose commands itself (specifically the empty // commands when leadership changes), and these commands don't go @@ -126,7 +121,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.proposalQuota = newQuotaPool(r.store.cfg.RaftProposalQuota) r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time) r.mu.lastUpdateTimes.updateOnBecomeLeader(r.mu.state.Desc.Replicas().Unwrap(), timeutil.Now()) - r.mu.commandSizes = make(map[storagebase.CmdIDKey]int) } else if r.mu.proposalQuota != nil { // We're becoming a follower. @@ -136,7 +130,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.proposalQuota = nil r.mu.lastUpdateTimes = nil r.mu.quotaReleaseQueue = nil - r.mu.commandSizes = nil } return } else if r.mu.proposalQuota == nil { @@ -237,13 +230,13 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( if qLen := uint64(len(r.mu.quotaReleaseQueue)); qLen < numReleases { numReleases = qLen } - sum := 0 + sum := int64(0) for _, rel := range r.mu.quotaReleaseQueue[:numReleases] { sum += rel } r.mu.proposalQuotaBaseIndex += numReleases r.mu.quotaReleaseQueue = r.mu.quotaReleaseQueue[numReleases:] - r.mu.proposalQuota.add(int64(sum)) + r.mu.proposalQuota.add(sum) } } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 443394d00170..54eef8e73237 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -169,21 +169,11 @@ func (r *Replica) evalAndPropose( proposal.command.ProposerReplica = repDesc proposal.command.ProposerLeaseSequence = lease.Sequence - // TODO(irfansharif): This int cast indicates that if someone configures a - // very large max proposal size, there is weird overflow behavior and it - // will not work the way it should. - proposalSize := proposal.command.Size() - if proposalSize > int(MaxCommandSize.Get(&r.store.cfg.Settings.SV)) { - // Once a command is written to the raft log, it must be loaded - // into memory and replayed on all replicas. If a command is - // too big, stop it here. - return nil, nil, 0, roachpb.NewError(errors.Errorf( - "command is too large: %d bytes (max: %d)", - proposalSize, MaxCommandSize.Get(&r.store.cfg.Settings.SV), - )) - } - proposal.quotaSize = proposalSize - + // Once a command is written to the raft log, it must be loaded into memory + // and replayed on all replicas. If a command is too big, stop it here. If + // the command is not too big, acquire an appropriate amount of quota from + // the replica's proposal quota pool. + // // TODO(tschottdorf): blocking a proposal here will leave it dangling in the // closed timestamp tracker for an extended period of time, which will in turn // prevent the node-wide closed timestamp from making progress. This is quite @@ -191,7 +181,13 @@ func (r *Replica) evalAndPropose( // closed timestamp tracker is acquired. This is better anyway; right now many // commands can evaluate but then be blocked on quota, which has worse memory // behavior. - if err := r.maybeAcquireProposalQuota(ctx, int64(proposal.quotaSize)); err != nil { + proposal.quotaSize = int64(proposal.command.Size()) + if maxSize := MaxCommandSize.Get(&r.store.cfg.Settings.SV); proposal.quotaSize > maxSize { + return nil, nil, 0, roachpb.NewError(errors.Errorf( + "command is too large: %d bytes (max: %d)", proposal.quotaSize, maxSize, + )) + } + if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil { return nil, nil, 0, roachpb.NewError(err) } @@ -704,20 +700,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.metrics.RaftCommandsApplied.Inc(1) stats.processed++ - r.mu.Lock() - if r.mu.replicaID == r.mu.leaderID { - // At this point we're not guaranteed to have proposalQuota - // initialized, the same is true for quotaReleaseQueue and - // commandSizes. By checking if the specified commandID is - // present in commandSizes, we'll only queue the cmdSize if - // they're all initialized. - if cmdSize, ok := r.mu.commandSizes[commandID]; ok { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize) - delete(r.mu.commandSizes, commandID) - } - } - r.mu.Unlock() - case raftpb.EntryConfChange: var cc raftpb.ConfChange if err := protoutil.Unmarshal(e.Data, &cc); err != nil { @@ -744,15 +726,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } stats.processed++ - r.mu.Lock() - if r.mu.replicaID == r.mu.leaderID { - if cmdSize, ok := r.mu.commandSizes[commandID]; ok { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize) - delete(r.mu.commandSizes, commandID) - } - } - r.mu.Unlock() - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ApplyConfChange(cc) return true, nil @@ -1630,6 +1603,13 @@ func (r *Replica) processRaftCommand( // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx delete(r.mu.proposals, idKey) + + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if r.mu.proposalQuota != nil { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, proposal.quotaSize) + } } leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)