Skip to content

Commit

Permalink
*: migrate some uses of ReplicaDescriptors.Unwrap to All or Voters
Browse files Browse the repository at this point in the history
Touches #37916

Release note: None
  • Loading branch information
danhhz committed Jul 15, 2019
1 parent 71d49de commit 4857fa5
Show file tree
Hide file tree
Showing 19 changed files with 95 additions and 58 deletions.
5 changes: 3 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,8 +1163,9 @@ func removeDeadReplicas(
err = storage.IterateRangeDescriptors(ctx, db, func(desc roachpb.RangeDescriptor) (bool, error) {
hasSelf := false
numDeadPeers := 0
numReplicas := len(desc.Replicas().Unwrap())
for _, rep := range desc.Replicas().Unwrap() {
allReplicas := desc.Replicas().All()
numReplicas := len(allReplicas)
for _, rep := range allReplicas {
if rep.StoreID == storeIdent.StoreID {
hasSelf = true
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,10 @@ func (ds *DistSender) getDescriptor(
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor, withCommit bool,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Try to send the call.
replicas := NewReplicaSlice(ds.gossip, desc)
// Try to send the call. Learner replicas won't serve reads/writes, so send
// only to the `Voters` replicas. This is just an optimization to save a
// network hop, everything would still work if we had `All` here.
replicas := NewReplicaSlice(ds.gossip, desc.Replicas().Voters())

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas := NewReplicaSlice(ds.gossip, desc)
// Learner replicas won't serve reads/writes, so send only to the `Voters`
// replicas. This is just an optimization to save a network hop, everything
// would still work if we had `All` here.
replicas := NewReplicaSlice(ds.gossip, desc.Replicas().Voters())
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)

transport, err := ds.transportFactory(SendOptions{}, ds.nodeDialer, replicas)
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ type ReplicaSlice []ReplicaInfo
// NewReplicaSlice creates a ReplicaSlice from the replicas listed in the range
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossiped are omitted from the result.
func NewReplicaSlice(gossip *gossip.Gossip, desc *roachpb.RangeDescriptor) ReplicaSlice {
func NewReplicaSlice(gossip *gossip.Gossip, replicas []roachpb.ReplicaDescriptor) ReplicaSlice {
if gossip == nil {
return nil
}
replicas := make(ReplicaSlice, 0, len(desc.Replicas().Unwrap()))
for _, r := range desc.Replicas().Unwrap() {
rs := make(ReplicaSlice, 0, len(replicas))
for _, r := range replicas {
nd, err := gossip.GetNodeDescriptor(r.NodeID)
if err != nil {
if log.V(1) {
log.Infof(context.TODO(), "node %d is not gossiped: %v", r.NodeID, err)
}
continue
}
replicas = append(replicas, ReplicaInfo{
rs = append(rs, ReplicaInfo{
ReplicaDescriptor: r,
NodeDesc: nd,
})
}
return replicas
return rs
}

// ReplicaSlice implements shuffle.Interface.
Expand Down
6 changes: 3 additions & 3 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r RangeDescriptor) Validate() error {
return errors.Errorf("NextReplicaID must be non-zero")
}
seen := map[ReplicaID]struct{}{}
for i, rep := range r.Replicas().Unwrap() {
for i, rep := range r.Replicas().All() {
if err := rep.Validate(); err != nil {
return errors.Errorf("replica %d is invalid: %s", i, err)
}
Expand All @@ -247,8 +247,8 @@ func (r RangeDescriptor) String() string {
}
buf.WriteString(" [")

if len(r.Replicas().Unwrap()) > 0 {
for i, rep := range r.Replicas().Unwrap() {
if allReplicas := r.Replicas().All(); len(allReplicas) > 0 {
for i, rep := range allReplicas {
if i > 0 {
buf.WriteString(", ")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (d ReplicaDescriptors) Unwrap() []ReplicaDescriptor {
}

// All returns every replica in the set, including both voter replicas and
// learner replicas.
// learner replicas. Voter replicas are ordered first in the returned slice.
func (d ReplicaDescriptors) All() []ReplicaDescriptor {
return d.wrapped
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (s *adminServer) statsForSpan(
if err := kv.Value.GetProto(&rng); err != nil {
return nil, s.serverError(err)
}
for _, repl := range rng.Replicas().Unwrap() {
for _, repl := range rng.Replicas().All() {
nodeIDs[repl.NodeID] = struct{}{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func (s *statusServer) RaftDebug(
desc := node.Range.State.Desc
// Check for whether replica should be GCed.
containsNode := false
for _, replica := range desc.Replicas().Unwrap() {
for _, replica := range desc.Replicas().All() {
if replica.NodeID == node.NodeID {
containsNode = true
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,7 +1714,7 @@ CREATE TABLE crdb_internal.ranges_no_leases (
database_name STRING NOT NULL,
table_name STRING NOT NULL,
index_name STRING NOT NULL,
replicas INT[] NOT NULL,
replicas INT[] NOT NULL,
split_enforced_until TIMESTAMP
)
`,
Expand Down Expand Up @@ -1768,8 +1768,11 @@ CREATE TABLE crdb_internal.ranges_no_leases (
return nil, err
}

// TODO(dan): We're trying to treat learners as a far-behind replica as
// much as possible, so just include them in the list of replicas. We can
// add a separate column for them if we get feedback about it.
var replicas []int
for _, rd := range desc.Replicas().Unwrap() {
for _, rd := range desc.Replicas().All() {
replicas = append(replicas, int(rd.StoreID))
}
sort.Ints(replicas)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsqlplan/replicaoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,15 @@ func (o *binPackingOracle) ChoosePreferredReplica(
// is available in gossip. If no nodes are available, a RangeUnavailableError is
// returned.
func replicaSliceOrErr(desc roachpb.RangeDescriptor, gsp *gossip.Gossip) (kv.ReplicaSlice, error) {
replicas := kv.NewReplicaSlice(gsp, &desc)
// Learner replicas won't serve reads/writes, so send only to the `Voters`
// replicas. This is just an optimization to save a network hop, everything
// would still work if we had `All` here.
voterReplicas := desc.Replicas().Voters()
replicas := kv.NewReplicaSlice(gsp, voterReplicas)
if len(replicas) == 0 {
// We couldn't get node descriptors for any replicas.
var nodeIDs []roachpb.NodeID
for _, r := range desc.Replicas().Unwrap() {
for _, r := range voterReplicas {
nodeIDs = append(nodeIDs, r.NodeID)
}
return kv.ReplicaSlice{}, sqlbase.NewRangeUnavailableError(
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (q *consistencyQueue) shouldQueue(
}
// Check if all replicas are live. Some tests run without a NodeLiveness configured.
if repl.store.cfg.NodeLiveness != nil {
for _, rep := range repl.Desc().Replicas().Unwrap() {
for _, rep := range repl.Desc().Replicas().All() {
if live, err := repl.store.cfg.NodeLiveness.IsLive(rep.NodeID); err != nil {
log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err)
return false, 0
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,14 @@ func (r *Replica) GetGCThreshold() hlc.Timestamp {
return *r.mu.state.GCThreshold
}

// maxReplicaID returns the maximum ReplicaID of any replica, including voters
// and learners.
func maxReplicaID(desc *roachpb.RangeDescriptor) roachpb.ReplicaID {
if desc == nil || !desc.IsInitialized() {
return 0
}
var maxID roachpb.ReplicaID
for _, repl := range desc.Replicas().Unwrap() {
for _, repl := range desc.Replicas().All() {
if repl.ReplicaID > maxID {
maxID = repl.ReplicaID
}
Expand Down Expand Up @@ -903,7 +905,9 @@ func (r *Replica) State() storagepb.RangeInfo {
}
ri.RangeMaxBytes = *r.mu.zone.RangeMaxBytes
if desc := ri.ReplicaState.Desc; desc != nil {
for _, replDesc := range desc.Replicas().Unwrap() {
// Learner replicas don't serve follower reads, but they still receive
// closed timestamp updates, so include them here.
for _, replDesc := range desc.Replicas().All() {
r.store.cfg.ClosedTimestamp.Storage.VisitDescending(replDesc.NodeID, func(e ctpb.Entry) (done bool) {
mlai, found := e.MLAI[r.RangeID]
if !found {
Expand Down
17 changes: 10 additions & 7 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func (r *Replica) changeReplicas(
}
repDescIdx := -1 // tracks NodeID && StoreID
nodeUsed := false // tracks NodeID only
for i, existingRep := range desc.Replicas().Unwrap() {
for i, existingRep := range desc.Replicas().All() {
nodeUsedByExistingRep := existingRep.NodeID == repDesc.NodeID
nodeUsed = nodeUsed || nodeUsedByExistingRep

Expand Down Expand Up @@ -1349,7 +1349,7 @@ func (s *Store) AdminRelocateRange(
var addTargets []roachpb.ReplicaDescriptor
for _, t := range targets {
found := false
for _, replicaDesc := range rangeDesc.Replicas().Unwrap() {
for _, replicaDesc := range rangeDesc.Replicas().All() {
if replicaDesc.StoreID == t.StoreID && replicaDesc.NodeID == t.NodeID {
found = true
break
Expand All @@ -1364,7 +1364,7 @@ func (s *Store) AdminRelocateRange(
}

var removeTargets []roachpb.ReplicaDescriptor
for _, replicaDesc := range rangeDesc.Replicas().Unwrap() {
for _, replicaDesc := range rangeDesc.Replicas().All() {
found := false
for _, t := range targets {
if replicaDesc.StoreID == t.StoreID && replicaDesc.NodeID == t.NodeID {
Expand Down Expand Up @@ -1420,7 +1420,7 @@ func (s *Store) AdminRelocateRange(
})
desc.NextReplicaID++
case roachpb.REMOVE_REPLICA:
newReplicas := removeTargetFromSlice(desc.Replicas().Unwrap(), target)
newReplicas := removeTargetFromSlice(desc.Replicas().All(), target)
desc.SetReplicas(roachpb.MakeReplicaDescriptors(newReplicas))
default:
panic(errors.Errorf("unknown ReplicaChangeType %v", changeType))
Expand Down Expand Up @@ -1480,7 +1480,7 @@ func (s *Store) AdminRelocateRange(
ctx,
storeList,
zone,
rangeInfo.Desc.Replicas().Unwrap(),
rangeInfo.Desc.Replicas().All(),
rangeInfo,
s.allocator.scorerOptions())
if targetStore == nil {
Expand Down Expand Up @@ -1618,8 +1618,11 @@ func (r *Replica) adminScatter(
// probability 1/N of choosing each.
if args.RandomizeLeases && r.OwnsValidLease(r.store.Clock().Now()) {
desc := r.Desc()
newLeaseholderIdx := rand.Intn(len(desc.Replicas().Unwrap()))
targetStoreID := desc.Replicas().Unwrap()[newLeaseholderIdx].StoreID
// Learner replicas aren't allowed to become the leaseholder or raft leader,
// so only consider the `Voters` replicas.
voterReplicas := desc.Replicas().Voters()
newLeaseholderIdx := rand.Intn(len(voterReplicas))
targetStoreID := voterReplicas[newLeaseholderIdx].StoreID
if targetStoreID != r.store.StoreID() {
if err := r.AdminTransferLease(ctx, targetStoreID); err != nil {
log.Warningf(ctx, "failed to scatter lease to s%d: %+v", targetStoreID, err)
Expand Down
23 changes: 14 additions & 9 deletions pkg/storage/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func calcRangeCounter(
numReplicas int32,
clusterNodes int,
) (rangeCounter, unavailable, underreplicated, overreplicated bool) {
for _, rd := range desc.Replicas().Unwrap() {
// It seems unlikely that a learner replica would be the first live one, but
// there's no particular reason to exclude them. Note that `All` returns the
// voters first.
for _, rd := range desc.Replicas().All() {
if livenessMap[rd.NodeID].IsLive {
rangeCounter = rd.StoreID == storeID
break
Expand All @@ -165,25 +168,27 @@ func calcRangeCounter(
// We also compute an estimated per-range count of under-replicated and
// unavailable ranges for each range based on the liveness table.
if rangeCounter {
liveReplicas := calcLiveReplicas(desc, livenessMap)
if liveReplicas < desc.Replicas().QuorumSize() {
liveVoterReplicas := calcLiveVoterReplicas(desc, livenessMap)
if liveVoterReplicas < desc.Replicas().QuorumSize() {
unavailable = true
}
needed := GetNeededReplicas(numReplicas, clusterNodes)
if needed > liveReplicas {
if needed > liveVoterReplicas {
underreplicated = true
} else if needed < liveReplicas {
} else if needed < liveVoterReplicas {
overreplicated = true
}
}
return
}

// calcLiveReplicas returns a count of the live replicas; a live replica is
// determined by checking its node in the provided liveness map.
func calcLiveReplicas(desc *roachpb.RangeDescriptor, livenessMap IsLiveMap) int {
// calcLiveVoterReplicas returns a count of the live voter replicas; a live
// replica is determined by checking its node in the provided liveness map. This
// method is used when indicating under-replication so only voter replicas are
// considered.
func calcLiveVoterReplicas(desc *roachpb.RangeDescriptor, livenessMap IsLiveMap) int {
var live int
for _, rd := range desc.Replicas().Unwrap() {
for _, rd := range desc.Replicas().Voters() {
if livenessMap[rd.NodeID].IsLive {
live++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func shouldReplicaQuiesce(
}

var foundSelf bool
for _, rep := range q.descRLocked().Replicas().Unwrap() {
for _, rep := range q.descRLocked().Replicas().All() {
if uint64(rep.ReplicaID) == status.ID {
foundSelf = true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
return raftpb.HardState{}, raftpb.ConfState{}, err
}
var cs raftpb.ConfState
for _, rep := range r.mu.state.Desc.Replicas().Unwrap() {
for _, rep := range r.mu.state.Desc.Replicas().All() {
cs.Nodes = append(cs.Nodes, uint64(rep.ReplicaID))
}

Expand Down Expand Up @@ -536,7 +536,7 @@ func snapshot(

// Synthesize our raftpb.ConfState from desc.
var cs raftpb.ConfState
for _, rep := range desc.Replicas().Unwrap() {
for _, rep := range desc.Replicas().All() {
cs.Nodes = append(cs.Nodes, uint64(rep.ReplicaID))
}

Expand Down
Loading

0 comments on commit 4857fa5

Please sign in to comment.