Skip to content

Commit

Permalink
storage: remove commandSizes map, use quota size on ProposalData
Browse files Browse the repository at this point in the history
This commit addresses a TODO added in the previous commit to remove
the commandSizes map in favor of using the quota size field tracked
directly on the ProposalData object. This allows us to avoid an extra
exclusive lock on each applied command, reducing below Raft lock contention
with above Raft processing (which often holds a shared lock).

Release note: None
  • Loading branch information
nvanbenschoten committed Jun 26, 2019
1 parent 1ff3556 commit faff108
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 81 deletions.
6 changes: 0 additions & 6 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2171,9 +2171,6 @@ func TestQuotaPool(t *testing.T) {
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 1 {
return errors.Errorf("expected 1 queued quota release, found: %d", qLen)
}
if cLen := leaderRepl.CommandSizesLen(); cLen != 0 {
return errors.Errorf("expected zero-length command sizes map, found %d", cLen)
}
return nil
})

Expand All @@ -2196,9 +2193,6 @@ func TestQuotaPool(t *testing.T) {
if qLen := leaderRepl.QuotaReleaseQueueLen(); qLen != 0 {
return errors.Errorf("expected no queued quota releases, found: %d", qLen)
}
if cLen := leaderRepl.CommandSizesLen(); cLen != 0 {
return errors.Errorf("expected zero-length command sizes map, found %d", cLen)
}
return nil
})

Expand Down
8 changes: 0 additions & 8 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -273,7 +272,6 @@ func (r *Replica) InitQuotaPool(quota int64) {
}
r.mu.proposalQuota = newQuotaPool(quota)
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
}

// QuotaAvailable returns the quota available in the replica's quota pool. Only
Expand All @@ -296,12 +294,6 @@ func (r *Replica) IsFollowerActive(ctx context.Context, followerID roachpb.Repli
return r.mu.lastUpdateTimes.isFollowerActive(ctx, followerID, timeutil.Now())
}

func (r *Replica) CommandSizesLen() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.mu.commandSizes)
}

// GetTSCacheHighWater returns the high water mark of the replica's timestamp
// cache.
func (r *Replica) GetTSCacheHighWater() hlc.Timestamp {
Expand Down
20 changes: 7 additions & 13 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,11 @@ type Replica struct {

proposalQuotaBaseIndex uint64

// For command size based allocations we keep track of the sizes of all
// in-flight commands.
commandSizes map[storagebase.CmdIDKey]int

// Once the leader observes a proposal come 'out of Raft', we consult
// the 'commandSizes' map to determine the size of the associated
// command and add it to a queue of quotas we have yet to release back
// to the quota pool. We only do so when all replicas have persisted
// the corresponding entry into their logs.
quotaReleaseQueue []int
// Once the leader observes a proposal come 'out of Raft', we add the
// size of the associated command to a queue of quotas we have yet to
// release back to the quota pool. We only do so when all replicas have
// persisted the corresponding entry into their logs.
quotaReleaseQueue []int64

// Counts calls to Replica.tick()
ticks int
Expand Down Expand Up @@ -588,9 +583,8 @@ func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// NB: We may be double free-ing here in cases where proposals are
// duplicated. To counter this our quota pool is capped at the initial
// quota size.
if cmdSize, ok := r.mu.commandSizes[p.idKey]; ok {
r.mu.proposalQuota.add(int64(cmdSize))
delete(r.mu.commandSizes, p.idKey)
if r.mu.proposalQuota != nil {
r.mu.proposalQuota.add(p.quotaSize)
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ type ProposalData struct {
// quotaSize is the encoded size of command that was used to acquire
// proposal quota. command.Size can change slightly as the object is
// mutated, so it's safer to record the exact value used here.
// TODO(nvanbenschoten): we're already tracking this here, so why do
// we need the separate commandSizes map? Let's get rid of it.
quotaSize int
quotaSize int64

// tmpFooter is used to avoid an allocation.
tmpFooter storagepb.RaftCommandFooter
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,4 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
// decide if/when to re-propose it.
p.proposedAtTicks = rp.mu.ticks
rp.mu.proposals[p.idKey] = p
if rp.mu.commandSizes != nil {
rp.mu.commandSizes[p.idKey] = p.quotaSize
}
}
11 changes: 2 additions & 9 deletions pkg/storage/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"go.etcd.io/etcd/raft"
Expand Down Expand Up @@ -99,7 +98,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
r.mu.proposalQuota = nil
r.mu.lastUpdateTimes = nil
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = nil
return
}

Expand All @@ -114,9 +112,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
if releaseQueueLen := len(r.mu.quotaReleaseQueue); releaseQueueLen != 0 {
log.Fatalf(ctx, "len(r.mu.quotaReleaseQueue) = %d, expected 0", releaseQueueLen)
}
if commandSizesLen := len(r.mu.commandSizes); commandSizesLen != 0 {
log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen)
}

// Raft may propose commands itself (specifically the empty
// commands when leadership changes), and these commands don't go
Expand All @@ -126,7 +121,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
r.mu.proposalQuota = newQuotaPool(r.store.cfg.RaftProposalQuota)
r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time)
r.mu.lastUpdateTimes.updateOnBecomeLeader(r.mu.state.Desc.Replicas().Unwrap(), timeutil.Now())
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
} else if r.mu.proposalQuota != nil {
// We're becoming a follower.

Expand All @@ -136,7 +130,6 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
r.mu.proposalQuota = nil
r.mu.lastUpdateTimes = nil
r.mu.quotaReleaseQueue = nil
r.mu.commandSizes = nil
}
return
} else if r.mu.proposalQuota == nil {
Expand Down Expand Up @@ -237,13 +230,13 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
if qLen := uint64(len(r.mu.quotaReleaseQueue)); qLen < numReleases {
numReleases = qLen
}
sum := 0
sum := int64(0)
for _, rel := range r.mu.quotaReleaseQueue[:numReleases] {
sum += rel
}
r.mu.proposalQuotaBaseIndex += numReleases
r.mu.quotaReleaseQueue = r.mu.quotaReleaseQueue[numReleases:]

r.mu.proposalQuota.add(int64(sum))
r.mu.proposalQuota.add(sum)
}
}
58 changes: 19 additions & 39 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,29 +169,25 @@ func (r *Replica) evalAndPropose(
proposal.command.ProposerReplica = repDesc
proposal.command.ProposerLeaseSequence = lease.Sequence

// TODO(irfansharif): This int cast indicates that if someone configures a
// very large max proposal size, there is weird overflow behavior and it
// will not work the way it should.
proposalSize := proposal.command.Size()
if proposalSize > int(MaxCommandSize.Get(&r.store.cfg.Settings.SV)) {
// Once a command is written to the raft log, it must be loaded
// into memory and replayed on all replicas. If a command is
// too big, stop it here.
return nil, nil, 0, roachpb.NewError(errors.Errorf(
"command is too large: %d bytes (max: %d)",
proposalSize, MaxCommandSize.Get(&r.store.cfg.Settings.SV),
))
}
proposal.quotaSize = proposalSize

// Once a command is written to the raft log, it must be loaded into memory
// and replayed on all replicas. If a command is too big, stop it here. If
// the command is not too big, acquire an appropriate amount of quota from
// the replica's proposal quota pool.
//
// TODO(tschottdorf): blocking a proposal here will leave it dangling in the
// closed timestamp tracker for an extended period of time, which will in turn
// prevent the node-wide closed timestamp from making progress. This is quite
// unfortunate; we should hoist the quota pool before the reference with the
// closed timestamp tracker is acquired. This is better anyway; right now many
// commands can evaluate but then be blocked on quota, which has worse memory
// behavior.
if err := r.maybeAcquireProposalQuota(ctx, int64(proposal.quotaSize)); err != nil {
proposal.quotaSize = int64(proposal.command.Size())
if maxSize := MaxCommandSize.Get(&r.store.cfg.Settings.SV); proposal.quotaSize > maxSize {
return nil, nil, 0, roachpb.NewError(errors.Errorf(
"command is too large: %d bytes (max: %d)", proposal.quotaSize, maxSize,
))
}
if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}

Expand Down Expand Up @@ -704,20 +700,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.store.metrics.RaftCommandsApplied.Inc(1)
stats.processed++

r.mu.Lock()
if r.mu.replicaID == r.mu.leaderID {
// At this point we're not guaranteed to have proposalQuota
// initialized, the same is true for quotaReleaseQueue and
// commandSizes. By checking if the specified commandID is
// present in commandSizes, we'll only queue the cmdSize if
// they're all initialized.
if cmdSize, ok := r.mu.commandSizes[commandID]; ok {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize)
delete(r.mu.commandSizes, commandID)
}
}
r.mu.Unlock()

case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := protoutil.Unmarshal(e.Data, &cc); err != nil {
Expand All @@ -744,15 +726,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
stats.processed++

r.mu.Lock()
if r.mu.replicaID == r.mu.leaderID {
if cmdSize, ok := r.mu.commandSizes[commandID]; ok {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmdSize)
delete(r.mu.commandSizes, commandID)
}
}
r.mu.Unlock()

if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ApplyConfChange(cc)
return true, nil
Expand Down Expand Up @@ -1630,6 +1603,13 @@ func (r *Replica) processRaftCommand(
// We initiated this command, so use the caller-supplied context.
ctx = proposal.ctx
delete(r.mu.proposals, idKey)

// At this point we're not guaranteed to have proposalQuota initialized,
// the same is true for quotaReleaseQueues. Only queue the proposal's
// quota for release if the proposalQuota is initialized.
if r.mu.proposalQuota != nil {
r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, proposal.quotaSize)
}
}

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)
Expand Down

0 comments on commit faff108

Please sign in to comment.