Skip to content

Commit

Permalink
storage: s/replica.mu.proposals/replica.mu.localProposals/
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
nvanbenschoten committed Jul 23, 2018
1 parent 03b116f commit 192a828
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 24 deletions.
35 changes: 17 additions & 18 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,15 @@ type Replica struct {
minLeaseProposedTS hlc.Timestamp
// Max bytes before split.
maxBytes int64
// proposals stores the Raft in-flight commands which
// originated at this Replica, i.e. all commands for which
// propose has been called, but which have not yet
// applied.
// localProposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
// but which have not yet applied.
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
proposals map[storagebase.CmdIDKey]*ProposalData
localProposals map[storagebase.CmdIDKey]*ProposalData
// remoteProposals is maintained by Raft leaders and stores in-flight
// commands that were forwarded to the leader during its current term.
// The set allows leaders to detect duplicate forwarded commands and
Expand Down Expand Up @@ -696,7 +695,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */)
r.cmdQMu.Unlock()

r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
Expand Down Expand Up @@ -852,7 +851,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
ProposalRetry: proposalRangeNoLongerExists,
}
for _, p := range r.mu.proposals {
for _, p := range r.mu.localProposals {
r.cleanupFailedProposalLocked(p)
p.finishApplication(pr)
}
Expand All @@ -864,7 +863,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
delete(r.mu.proposals, p.idKey)
delete(r.mu.localProposals, p.idKey)
// Release associated quota pool resources if we have been tracking
// this command.
//
Expand Down Expand Up @@ -1866,7 +1865,7 @@ func (r *Replica) State() storagebase.RangeInfo {
var ri storagebase.RangeInfo
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.proposals))
ri.NumPending = uint64(len(r.mu.localProposals))
ri.RaftLogSize = r.mu.raftLogSize
ri.NumDropped = uint64(r.mu.droppedMessages)
if r.mu.proposalQuota != nil {
Expand Down Expand Up @@ -3344,11 +3343,11 @@ func (r *Replica) insertProposalLocked(
proposal.idKey, proposal.command.MaxLeaseIndex)
}

if _, ok := r.mu.proposals[proposal.idKey]; ok {
if _, ok := r.mu.localProposals[proposal.idKey]; ok {
ctx := r.AnnotateCtx(context.TODO())
log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey)
}
r.mu.proposals[proposal.idKey] = proposal
r.mu.localProposals[proposal.idKey] = proposal
}

func makeIDKey() storagebase.CmdIDKey {
Expand Down Expand Up @@ -3556,7 +3555,7 @@ func (r *Replica) propose(
// range.
tryAbandon := func() bool {
r.mu.Lock()
p, ok := r.mu.proposals[idKey]
p, ok := r.mu.localProposals[idKey]
if ok {
// TODO(radu): Should this context be created via tracer.ForkCtxSpan?
// We'd need to make sure the span is finished eventually.
Expand Down Expand Up @@ -3683,9 +3682,9 @@ func (r *Replica) quiesce() bool {

func (r *Replica) quiesceLocked() bool {
ctx := r.AnnotateCtx(context.TODO())
if len(r.mu.proposals) != 0 {
if len(r.mu.localProposals) != 0 {
if log.V(3) {
log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals))
log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals))
}
return false
}
Expand Down Expand Up @@ -4339,7 +4338,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) {
// correctness issues.
func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool {
ctx := r.AnnotateCtx(context.TODO())
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap)
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap)
if !ok {
return false
}
Expand Down Expand Up @@ -4600,7 +4599,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR

numShouldRetry := 0
var reproposals pendingCmdSlice
for _, p := range r.mu.proposals {
for _, p := range r.mu.localProposals {
if p.command.MaxLeaseIndex == 0 {
// Commands without a MaxLeaseIndex cannot be reproposed, as they might
// apply twice. We also don't want to ask the proposer to retry these
Expand Down Expand Up @@ -5070,14 +5069,14 @@ func (r *Replica) processRaftCommand(
}

r.mu.Lock()
proposal, proposedLocally := r.mu.proposals[idKey]
proposal, proposedLocally := r.mu.localProposals[idKey]

// TODO(tschottdorf): consider the Trace situation here.
if proposedLocally {
// We initiated this command, so use the caller-supplied context.
ctx = proposal.ctx
proposal.ctx = nil // avoid confusion
delete(r.mu.proposals, idKey)
delete(r.mu.localProposals, idKey)
}

// Delete the entry for a forwarded proposal set.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked(
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
cmd, ok := r.mu.localProposals[cmdID]
if ok {
return *cmd.command, true
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7455,7 +7455,7 @@ func TestReplicaTryAbandon(t *testing.T) {
func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if len(tc.repl.mu.proposals) == 0 {
if len(tc.repl.mu.localProposals) == 0 {
t.Fatal("expected non-empty proposals map")
}
}()
Expand Down Expand Up @@ -8010,7 +8010,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
}

tc.repl.mu.Lock()
for _, p := range tc.repl.mu.proposals {
for _, p := range tc.repl.mu.localProposals {
if v := p.ctx.Value(magicKey{}); v != nil {
origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex))
}
Expand Down Expand Up @@ -8042,13 +8042,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {

tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
nonePending := len(tc.repl.mu.proposals) == 0
nonePending := len(tc.repl.mu.localProposals) == 0
c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex)
if nonePending && c > 0 {
t.Errorf("no pending cmds, but have required index offset %d", c)
}
if !nonePending {
t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals)
t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals)
}
}

Expand Down Expand Up @@ -8206,7 +8206,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
}
// Build the map of expected reproposals at this stage.
m := map[storagebase.CmdIDKey]int{}
for id, p := range r.mu.proposals {
for id, p := range r.mu.localProposals {
m[id] = p.proposedAtTicks
}
r.mu.Unlock()
Expand Down

0 comments on commit 192a828

Please sign in to comment.