From 192a828813ab28c8065c92deeab177daaaf0f680 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 23 Jul 2018 09:35:23 -0400 Subject: [PATCH] storage: s/replica.mu.proposals/replica.mu.localProposals/ Release note: None --- pkg/storage/replica.go | 35 ++++++++++++++++----------------- pkg/storage/replica_sideload.go | 2 +- pkg/storage/replica_test.go | 10 +++++----- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6e2474d2e0c1..40908178f4ed 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -373,16 +373,15 @@ type Replica struct { minLeaseProposedTS hlc.Timestamp // Max bytes before split. maxBytes int64 - // proposals stores the Raft in-flight commands which - // originated at this Replica, i.e. all commands for which - // propose has been called, but which have not yet - // applied. + // localProposals stores the Raft in-flight commands which originated at + // this Replica, i.e. all commands for which propose has been called, + // but which have not yet applied. // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData + localProposals map[storagebase.CmdIDKey]*ProposalData // remoteProposals is maintained by Raft leaders and stores in-flight // commands that were forwarded to the leader during its current term. // The set allows leaders to detect duplicate forwarded commands and @@ -696,7 +695,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */) r.cmdQMu.Unlock() - r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -852,7 +851,7 @@ func (r *Replica) cancelPendingCommandsLocked() { Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")), ProposalRetry: proposalRangeNoLongerExists, } - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } @@ -864,7 +863,7 @@ func (r *Replica) cancelPendingCommandsLocked() { func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. - delete(r.mu.proposals, p.idKey) + delete(r.mu.localProposals, p.idKey) // Release associated quota pool resources if we have been tracking // this command. // @@ -1866,7 +1865,7 @@ func (r *Replica) State() storagebase.RangeInfo { var ri storagebase.RangeInfo ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.proposals)) + ri.NumPending = uint64(len(r.mu.localProposals)) ri.RaftLogSize = r.mu.raftLogSize ri.NumDropped = uint64(r.mu.droppedMessages) if r.mu.proposalQuota != nil { @@ -3344,11 +3343,11 @@ func (r *Replica) insertProposalLocked( proposal.idKey, proposal.command.MaxLeaseIndex) } - if _, ok := r.mu.proposals[proposal.idKey]; ok { + if _, ok := r.mu.localProposals[proposal.idKey]; ok { ctx := r.AnnotateCtx(context.TODO()) log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey) } - r.mu.proposals[proposal.idKey] = proposal + r.mu.localProposals[proposal.idKey] = proposal } func makeIDKey() storagebase.CmdIDKey { @@ -3556,7 +3555,7 @@ func (r *Replica) propose( // range. tryAbandon := func() bool { r.mu.Lock() - p, ok := r.mu.proposals[idKey] + p, ok := r.mu.localProposals[idKey] if ok { // TODO(radu): Should this context be created via tracer.ForkCtxSpan? // We'd need to make sure the span is finished eventually. @@ -3683,9 +3682,9 @@ func (r *Replica) quiesce() bool { func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) - if len(r.mu.proposals) != 0 { + if len(r.mu.localProposals) != 0 { if log.V(3) { - log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals)) + log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals)) } return false } @@ -4339,7 +4338,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { // correctness issues. func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool { ctx := r.AnnotateCtx(context.TODO()) - status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap) + status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap) if !ok { return false } @@ -4600,7 +4599,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR numShouldRetry := 0 var reproposals pendingCmdSlice - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { if p.command.MaxLeaseIndex == 0 { // Commands without a MaxLeaseIndex cannot be reproposed, as they might // apply twice. We also don't want to ask the proposer to retry these @@ -5070,14 +5069,14 @@ func (r *Replica) processRaftCommand( } r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] + proposal, proposedLocally := r.mu.localProposals[idKey] // TODO(tschottdorf): consider the Trace situation here. if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx proposal.ctx = nil // avoid confusion - delete(r.mu.proposals, idKey) + delete(r.mu.localProposals, idKey) } // Delete the entry for a forwarded proposal set. diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 12962034a976..571cb3116335 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -70,7 +70,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked( maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) { r.mu.Lock() defer r.mu.Unlock() - cmd, ok := r.mu.proposals[cmdID] + cmd, ok := r.mu.localProposals[cmdID] if ok { return *cmd.command, true } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index f02895fdf23b..9b25207305af 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7455,7 +7455,7 @@ func TestReplicaTryAbandon(t *testing.T) { func() { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - if len(tc.repl.mu.proposals) == 0 { + if len(tc.repl.mu.localProposals) == 0 { t.Fatal("expected non-empty proposals map") } }() @@ -8010,7 +8010,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } tc.repl.mu.Lock() - for _, p := range tc.repl.mu.proposals { + for _, p := range tc.repl.mu.localProposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } @@ -8042,13 +8042,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - nonePending := len(tc.repl.mu.proposals) == 0 + nonePending := len(tc.repl.mu.localProposals) == 0 c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex) if nonePending && c > 0 { t.Errorf("no pending cmds, but have required index offset %d", c) } if !nonePending { - t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals) + t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals) } } @@ -8206,7 +8206,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} - for id, p := range r.mu.proposals { + for id, p := range r.mu.localProposals { m[id] = p.proposedAtTicks } r.mu.Unlock()