From c6ade7db1897d84594fa16f3b577ab08c7afbfe6 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 12 Dec 2018 02:13:22 -0800 Subject: [PATCH 1/2] storage: refactor GetNeededReplicas to use a notion of available nodes This is the commit from #30559 but without all the changes that aren't necessary to enable backporting #32949. Release note: None --- pkg/server/server.go | 1 + pkg/server/status.go | 10 ++-- pkg/storage/allocator.go | 26 ++++++---- pkg/storage/allocator_test.go | 92 ++++++++++++--------------------- pkg/storage/replica.go | 10 ++-- pkg/storage/replica_test.go | 4 +- pkg/storage/replicate_queue.go | 6 +-- pkg/storage/store.go | 9 ++-- pkg/storage/store_pool.go | 25 +++++++++ pkg/storage/store_rebalancer.go | 5 +- 10 files changed, 97 insertions(+), 91 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index c428a7371744..ea2adbb3ce6d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -545,6 +545,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.gossip, s.recorder, s.nodeLiveness, + s.storePool, s.rpcContext, s.node.stores, s.stopper, diff --git a/pkg/server/status.go b/pkg/server/status.go index 1ea9cb9ebdcb..1faea27c82a0 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -35,6 +35,7 @@ import ( "strings" "sync" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/storage/engine" gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/pkg/errors" @@ -46,7 +47,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/build" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" @@ -130,6 +130,7 @@ type statusServer struct { gossip *gossip.Gossip metricSource metricMarshaler nodeLiveness *storage.NodeLiveness + storePool *storage.StorePool rpcCtx *rpc.Context stores *storage.Stores stopper *stop.Stopper @@ -146,6 +147,7 @@ func newStatusServer( gossip *gossip.Gossip, metricSource metricMarshaler, nodeLiveness *storage.NodeLiveness, + storePool *storage.StorePool, rpcCtx *rpc.Context, stores *storage.Stores, stopper *stop.Stopper, @@ -161,6 +163,7 @@ func newStatusServer( gossip: gossip, metricSource: metricSource, nodeLiveness: nodeLiveness, + storePool: storePool, rpcCtx: rpcCtx, stores: stores, stopper: stopper, @@ -1230,6 +1233,7 @@ func (s *statusServer) Ranges( cfg = config.SystemConfig{} } isLiveMap := s.nodeLiveness.GetIsLiveMap() + availableNodes := s.storePool.AvailableNodeCount() err = s.stores.VisitStores(func(store *storage.Store) error { timestamp := store.Clock().Now() @@ -1249,7 +1253,7 @@ func (s *statusServer) Ranges( desc, rep, store.Ident.StoreID, - rep.Metrics(ctx, timestamp, cfg, isLiveMap), + rep.Metrics(ctx, timestamp, cfg, isLiveMap, availableNodes), )) return false, nil }) @@ -1269,7 +1273,7 @@ func (s *statusServer) Ranges( *desc, rep, store.Ident.StoreID, - rep.Metrics(ctx, timestamp, cfg, isLiveMap), + rep.Metrics(ctx, timestamp, cfg, isLiveMap, availableNodes), )) } return nil diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index b6e1e82887d2..d500999c5108 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -239,17 +239,19 @@ func MakeAllocator( } } -// GetNeededReplicas calculates the number of replicas a range should have given its zone config and -// dynamic up and down replication. -func GetNeededReplicas( - zoneConfigReplicaCount int32, aliveReplicas int, decommissioningReplicas int, -) int { +// GetNeededReplicas calculates the number of replicas a range should +// have given its zone config and the number of nodes available for +// up-replication (i.e. live and not decommissioning). +func GetNeededReplicas(zoneConfigReplicaCount int32, availableNodes int) int { numZoneReplicas := int(zoneConfigReplicaCount) need := numZoneReplicas - // We're adjusting the replication factor all ranges so that if there are less nodes than - // replicas specified in the zone config, the cluster can still function. - need = int(math.Min(float64(aliveReplicas-decommissioningReplicas), float64(need))) + // Adjust the replication factor for all ranges if there are fewer + // nodes than replicas specified in the zone config, so the cluster + // can still function. + if availableNodes < need { + need = availableNodes + } // Ensure that we don't up- or down-replicate to an even number of replicas // unless an even number of replicas was specifically requested by the user @@ -264,7 +266,9 @@ func GetNeededReplicas( if need%2 == 0 { need = need - 1 } - need = int(math.Max(3.0, float64(need))) + if need < 3 { + need = 3 + } if need > numZoneReplicas { need = numZoneReplicas } @@ -286,8 +290,8 @@ func (a *Allocator) ComputeAction( have := len(rangeInfo.Desc.Replicas) decommissioningReplicas := a.storePool.decommissioningReplicas(rangeInfo.Desc.RangeID, rangeInfo.Desc.Replicas) - _, aliveStoreCount, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterNone) - need := GetNeededReplicas(zone.NumReplicas, aliveStoreCount, len(decommissioningReplicas)) + availableNodes := a.storePool.AvailableNodeCount() + need := GetNeededReplicas(zone.NumReplicas, availableNodes) desiredQuorum := computeQuorum(need) quorum := computeQuorum(have) diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index f9381c6b65bb..ead55335fe5e 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -4895,72 +4895,48 @@ func TestAllocatorGetNeededReplicas(t *testing.T) { testCases := []struct { zoneRepls int32 - aliveRepls int - decomRepls int + availNodes int expected int }{ // If zone.NumReplicas <= 3, GetNeededReplicas should always return zone.NumReplicas. - {1, 0, 0, 1}, - {1, 1, 0, 1}, - {1, 1, 1, 1}, - {1, 0, 1, 1}, - {2, 0, 0, 2}, - {2, 1, 0, 2}, - {2, 2, 0, 2}, - {2, 2, 2, 2}, - {3, 0, 0, 3}, - {3, 1, 0, 3}, - {3, 3, 0, 3}, - {3, 3, 2, 3}, + {1, 0, 1}, + {1, 1, 1}, + {2, 0, 2}, + {2, 1, 2}, + {2, 2, 2}, + {3, 0, 3}, + {3, 1, 3}, + {3, 3, 3}, // Things get more involved when zone.NumReplicas > 3. - {4, 1, 0, 3}, - {4, 2, 0, 3}, - {4, 3, 0, 3}, - {4, 4, 0, 4}, - {4, 4, 1, 3}, - {4, 4, 2, 3}, - {4, 4, 3, 3}, - {5, 1, 0, 3}, - {5, 2, 0, 3}, - {5, 3, 0, 3}, - {5, 4, 0, 3}, - {5, 5, 0, 5}, - {5, 5, 1, 3}, - {5, 5, 2, 3}, - {5, 5, 3, 3}, - {6, 1, 0, 3}, - {6, 2, 0, 3}, - {6, 3, 0, 3}, - {6, 4, 0, 3}, - {6, 5, 0, 5}, - {6, 6, 0, 6}, - {6, 6, 1, 5}, - {6, 6, 2, 3}, - {6, 5, 1, 3}, - {6, 5, 2, 3}, - {6, 5, 3, 3}, - {7, 1, 0, 3}, - {7, 2, 0, 3}, - {7, 3, 0, 3}, - {7, 4, 0, 3}, - {7, 5, 0, 5}, - {7, 6, 0, 5}, - {7, 7, 0, 7}, - {7, 7, 1, 5}, - {7, 7, 2, 5}, - {7, 7, 3, 3}, - {7, 6, 1, 5}, - {7, 6, 2, 3}, - {7, 5, 1, 3}, - {7, 4, 1, 3}, - {7, 3, 1, 3}, + {4, 1, 3}, + {4, 2, 3}, + {4, 3, 3}, + {4, 4, 4}, + {5, 1, 3}, + {5, 2, 3}, + {5, 3, 3}, + {5, 4, 3}, + {5, 5, 5}, + {6, 1, 3}, + {6, 2, 3}, + {6, 3, 3}, + {6, 4, 3}, + {6, 5, 5}, + {6, 6, 6}, + {7, 1, 3}, + {7, 2, 3}, + {7, 3, 3}, + {7, 4, 3}, + {7, 5, 5}, + {7, 6, 5}, + {7, 7, 7}, } for _, tc := range testCases { - if e, a := tc.expected, GetNeededReplicas(tc.zoneRepls, tc.aliveRepls, tc.decomRepls); e != a { + if e, a := tc.expected, GetNeededReplicas(tc.zoneRepls, tc.availNodes); e != a { t.Errorf( - "GetNeededReplicas(zone.NumReplicas=%d, aliveReplicas=%d, decomReplicas=%d) got %d; want %d", - tc.zoneRepls, tc.aliveRepls, tc.decomRepls, a, e) + "GetNeededReplicas(zone.NumReplicas=%d, availNodes=%d) got %d; want %d", + tc.zoneRepls, tc.availNodes, a, e) } } } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index f3c8db731db1..74b0db5a732d 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -6864,6 +6864,7 @@ func (r *Replica) Metrics( now hlc.Timestamp, cfg config.SystemConfig, livenessMap map[roachpb.NodeID]bool, + availableNodes int, ) ReplicaMetrics { r.mu.RLock() raftStatus := r.raftStatusRLocked() @@ -6887,6 +6888,7 @@ func (r *Replica) Metrics( &r.store.cfg.RaftConfig, cfg, livenessMap, + availableNodes, desc, raftStatus, leaseStatus, @@ -6896,7 +6898,6 @@ func (r *Replica) Metrics( cmdQMetricsLocal, cmdQMetricsGlobal, raftLogSize, - r.store.allocator.storePool, ) } @@ -6915,6 +6916,7 @@ func calcReplicaMetrics( raftCfg *base.RaftConfig, cfg config.SystemConfig, livenessMap map[roachpb.NodeID]bool, + availableNodes int, desc *roachpb.RangeDescriptor, raftStatus *raft.Status, leaseStatus LeaseStatus, @@ -6924,7 +6926,6 @@ func calcReplicaMetrics( cmdQMetricsLocal CommandQueueMetrics, cmdQMetricsGlobal CommandQueueMetrics, raftLogSize int64, - storePool *StorePool, ) ReplicaMetrics { var m ReplicaMetrics @@ -6967,10 +6968,7 @@ func calcReplicaMetrics( if zoneConfig, err := cfg.GetZoneConfigForKey(desc.StartKey); err != nil { log.Error(ctx, err) } else { - decommissioningReplicas := len(storePool.decommissioningReplicas(desc.RangeID, desc.Replicas)) - _, aliveStoreCount, _ := storePool.getStoreList(desc.RangeID, storeFilterNone) - - if GetNeededReplicas(zoneConfig.NumReplicas, aliveStoreCount, decommissioningReplicas) > liveReplicas { + if GetNeededReplicas(zoneConfig.NumReplicas, availableNodes) > liveReplicas { m.Underreplicated = true } } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9e1e0f1ccbb3..7f024ac009f6 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -9140,8 +9140,8 @@ func TestReplicaMetrics(t *testing.T) { c.expected.Ticking = !c.expected.Quiescent metrics := calcReplicaMetrics( context.Background(), hlc.Timestamp{}, &cfg.RaftConfig, config.SystemConfig{}, - c.liveness, &c.desc, c.raftStatus, LeaseStatus{}, - c.storeID, c.expected.Quiescent, c.expected.Ticking, CommandQueueMetrics{}, CommandQueueMetrics{}, c.raftLogSize, tc.store.allocator.storePool) + c.liveness, 0, &c.desc, c.raftStatus, LeaseStatus{}, + c.storeID, c.expected.Quiescent, c.expected.Ticking, CommandQueueMetrics{}, CommandQueueMetrics{}, c.raftLogSize) if c.expected != metrics { t.Fatalf("unexpected metrics:\n%s", pretty.Diff(c.expected, metrics)) } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 0dae089141a1..35868ce5eed7 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -320,10 +320,8 @@ func (rq *replicateQueue) processOneChange( StoreID: newStore.StoreID, } - decommissioningReplicas := len(rq.allocator.storePool.decommissioningReplicas(desc.RangeID, desc.Replicas)) - _, aliveStoreCount, _ := rq.allocator.storePool.getStoreList(desc.RangeID, storeFilterNone) - - need := GetNeededReplicas(zone.NumReplicas, aliveStoreCount, decommissioningReplicas) + availableNodes := rq.allocator.storePool.AvailableNodeCount() + need := GetNeededReplicas(zone.NumReplicas, availableNodes) willHave := len(desc.Replicas) + 1 // Only up-replicate if there are suitable allocation targets such diff --git a/pkg/storage/store.go b/pkg/storage/store.go index af926307c083..cb503629e250 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -139,9 +139,9 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { } st := cluster.MakeTestingClusterSettings() sc := StoreConfig{ - Settings: st, - AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, - Clock: clock, + Settings: st, + AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, + Clock: clock, CoalescedHeartbeatsInterval: 50 * time.Millisecond, RaftHeartbeatIntervalTicks: 1, ScanInterval: 10 * time.Minute, @@ -4319,7 +4319,8 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { } newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool { - metrics := rep.Metrics(ctx, timestamp, cfg, livenessMap) + availableNodes := s.cfg.StorePool.AvailableNodeCount() + metrics := rep.Metrics(ctx, timestamp, cfg, livenessMap, availableNodes) if metrics.Leader { raftLeaderCount++ if metrics.LeaseValid && !metrics.Leaseholder { diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index b9f858f3b52b..f0030c46ad15 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -435,6 +435,31 @@ func (sp *StorePool) decommissioningReplicas( return } +// AvailableNodeCount returns the number of nodes which are +// considered available for use as allocation targets. This includes +// only live nodes which are not decommissioning. +func (sp *StorePool) AvailableNodeCount() int { + sp.detailsMu.RLock() + defer sp.detailsMu.RUnlock() + + now := sp.clock.PhysicalTime() + availableNodes := map[roachpb.NodeID]struct{}{} + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + + for _, detail := range sp.detailsMu.storeDetails { + switch s := detail.status(now, timeUntilStoreDead, 0, sp.nodeLivenessFn); s { + case storeStatusThrottled, storeStatusAvailable: + availableNodes[detail.desc.Node.NodeID] = struct{}{} + case storeStatusReplicaCorrupted, storeStatusDead, storeStatusUnknown, storeStatusDecommissioning: + // Do nothing; this node cannot be used. + default: + panic(fmt.Sprintf("unknown store status: %d", s)) + } + } + + return len(availableNodes) +} + // liveAndDeadReplicas divides the provided repls slice into two slices: the // first for live replicas, and the second for dead replicas. // Replicas for which liveness or deadness cannot be ascertained are excluded diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 72a9ba414b61..9c207b1fa4fa 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -524,10 +524,9 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( log.Error(ctx, err) return replicaWithStats{}, nil } - decommissioningReplicas := len(sr.rq.allocator.storePool.decommissioningReplicas(desc.RangeID, desc.Replicas)) - _, aliveStoreCount, _ := sr.rq.allocator.storePool.getStoreList(desc.RangeID, storeFilterNone) + availableNodes := sr.rq.allocator.storePool.AvailableNodeCount() + desiredReplicas := GetNeededReplicas(zone.NumReplicas, availableNodes) - desiredReplicas := GetNeededReplicas(zone.NumReplicas, aliveStoreCount, decommissioningReplicas) targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) targetReplicas := make([]roachpb.ReplicaDescriptor, 0, desiredReplicas) From 4aa0b528dcef2c6a729abc32efaf7a5ce17a074d Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 7 Dec 2018 17:23:54 -0800 Subject: [PATCH 2/2] storage: Fix GetNeededReplicas to count non-live, non-dead nodes Release note (bug fix): Fixes issue where ranges with high replication factors would be incorrectly down-replicated if enough nodes stop running that the number of desired replicas is greater than the number of running nodes. This issue could cause under-replication and potentially unavailability. --- pkg/storage/allocator.go | 2 +- pkg/storage/allocator_test.go | 83 ++++++++++++++++++++++++++++++++--- pkg/storage/store.go | 6 +-- pkg/storage/store_pool.go | 16 ++++--- 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index d500999c5108..8639ee9caa8f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -241,7 +241,7 @@ func MakeAllocator( // GetNeededReplicas calculates the number of replicas a range should // have given its zone config and the number of nodes available for -// up-replication (i.e. live and not decommissioning). +// up-replication (i.e. not dead and not decommissioning). func GetNeededReplicas(zoneConfigReplicaCount int32, availableNodes int) int { numZoneReplicas := int(zoneConfigReplicaCount) need := numZoneReplicas diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index ead55335fe5e..64a404b04f1d 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -329,7 +329,11 @@ func createTestAllocator( // ranges in deadReplicas. func mockStorePool( storePool *StorePool, - aliveStoreIDs, deadStoreIDs, decommissioningStoreIDs, decommissionedStoreIDs []roachpb.StoreID, + aliveStoreIDs []roachpb.StoreID, + unavailableStoreIDs []roachpb.StoreID, + deadStoreIDs []roachpb.StoreID, + decommissioningStoreIDs []roachpb.StoreID, + decommissionedStoreIDs []roachpb.StoreID, deadReplicas []roachpb.ReplicaIdent, ) { storePool.detailsMu.Lock() @@ -345,6 +349,14 @@ func mockStorePool( Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, } } + for _, storeID := range unavailableStoreIDs { + liveNodeSet[roachpb.NodeID(storeID)] = NodeLivenessStatus_UNAVAILABLE + detail := storePool.getStoreDetailLocked(storeID) + detail.desc = &roachpb.StoreDescriptor{ + StoreID: storeID, + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, + } + } for _, storeID := range deadStoreIDs { liveNodeSet[roachpb.NodeID(storeID)] = NodeLivenessStatus_DEAD detail := storePool.getStoreDetailLocked(storeID) @@ -923,6 +935,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { mockStorePool( sp, []roachpb.StoreID{1, 2, 3, 4, 5, 6}, + nil, []roachpb.StoreID{7, 8}, nil, nil, @@ -4460,6 +4473,7 @@ func TestAllocatorComputeAction(t *testing.T) { // is dead. mockStorePool(sp, []roachpb.StoreID{1, 2, 3, 4, 5, 8}, + nil, []roachpb.StoreID{6, 7}, nil, nil, @@ -4569,7 +4583,7 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { defer stopper.Stop(ctx) for i, tcase := range testCases { - mockStorePool(sp, tcase.live, tcase.dead, nil, nil, nil) + mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil, nil) action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { @@ -4790,7 +4804,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { defer stopper.Stop(ctx) for i, tcase := range testCases { - mockStorePool(sp, tcase.live, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) + mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) action, _ := a.ComputeAction(ctx, tcase.zone, RangeInfo{Desc: &tcase.desc}) if tcase.expectedAction != action { @@ -4807,14 +4821,15 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList []roachpb.StoreID expectedAction AllocatorAction live []roachpb.StoreID + unavailable []roachpb.StoreID dead []roachpb.StoreID decommissioning []roachpb.StoreID - decommissioned []roachpb.StoreID }{ { storeList: []roachpb.StoreID{1, 2, 3, 4}, expectedAction: AllocatorRemoveDecommissioning, live: []roachpb.StoreID{4}, + unavailable: nil, dead: nil, decommissioning: []roachpb.StoreID{1, 2, 3}, }, @@ -4822,6 +4837,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2, 3}, expectedAction: AllocatorAdd, live: []roachpb.StoreID{4, 5}, + unavailable: nil, dead: nil, decommissioning: []roachpb.StoreID{1, 2, 3}, }, @@ -4829,6 +4845,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2, 3, 4}, expectedAction: AllocatorRemoveDead, live: []roachpb.StoreID{1, 2, 3, 5}, + unavailable: nil, dead: []roachpb.StoreID{4}, decommissioning: nil, }, @@ -4836,6 +4853,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 4}, expectedAction: AllocatorAdd, live: []roachpb.StoreID{1, 2, 3, 5}, + unavailable: nil, dead: []roachpb.StoreID{4}, decommissioning: nil, }, @@ -4843,6 +4861,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2, 3}, expectedAction: AllocatorConsiderRebalance, live: []roachpb.StoreID{1, 2, 3, 4}, + unavailable: nil, dead: nil, decommissioning: nil, }, @@ -4850,6 +4869,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2}, expectedAction: AllocatorAdd, live: []roachpb.StoreID{1, 2}, + unavailable: nil, dead: nil, decommissioning: nil, }, @@ -4857,6 +4877,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2, 3}, expectedAction: AllocatorConsiderRebalance, live: []roachpb.StoreID{1, 2, 3}, + unavailable: nil, dead: nil, decommissioning: nil, }, @@ -4864,9 +4885,58 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { storeList: []roachpb.StoreID{1, 2, 3, 4}, expectedAction: AllocatorRemove, live: []roachpb.StoreID{1, 2, 3, 4}, + unavailable: nil, dead: nil, decommissioning: nil, }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorConsiderRebalance, + live: []roachpb.StoreID{1, 2, 3, 4, 5}, + unavailable: nil, + dead: nil, + decommissioning: nil, + }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorConsiderRebalance, + live: []roachpb.StoreID{1, 2, 3, 4}, + unavailable: []roachpb.StoreID{5}, + dead: nil, + decommissioning: nil, + }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorConsiderRebalance, + live: []roachpb.StoreID{1, 2, 3}, + unavailable: []roachpb.StoreID{4, 5}, + dead: nil, + decommissioning: nil, + }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorNoop, + live: []roachpb.StoreID{1, 2}, + unavailable: []roachpb.StoreID{3, 4, 5}, + dead: nil, + decommissioning: nil, + }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorRemoveDead, + live: []roachpb.StoreID{1, 2, 3}, + unavailable: []roachpb.StoreID{4}, + dead: []roachpb.StoreID{5}, + decommissioning: nil, + }, + { + storeList: []roachpb.StoreID{1, 2, 3, 4, 5}, + expectedAction: AllocatorRemoveDecommissioning, + live: []roachpb.StoreID{1, 2, 3}, + unavailable: []roachpb.StoreID{4}, + dead: nil, + decommissioning: []roachpb.StoreID{5}, + }, } stopper, _, sp, a, _ := createTestAllocator( /* deterministic */ false) @@ -4878,12 +4948,13 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { for _, prefixKey := range []roachpb.RKey{roachpb.RKey(keys.NodeLivenessPrefix), roachpb.RKey(keys.SystemPrefix)} { for i, tcase := range testCases { - mockStorePool(sp, tcase.live, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) + mockStorePool(sp, tcase.live, tcase.unavailable, tcase.dead, tcase.decommissioning, []roachpb.StoreID{}, nil) desc := makeDescriptor(tcase.storeList) desc.EndKey = prefixKey action, _ := a.ComputeAction(ctx, zone, RangeInfo{Desc: &desc}) if tcase.expectedAction != action { - t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) + t.Errorf("test case %d expected action %q, got action %q", + i, allocatorActionNames[tcase.expectedAction], allocatorActionNames[action]) continue } } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index cb503629e250..4cecd4b7e5cf 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -139,9 +139,9 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { } st := cluster.MakeTestingClusterSettings() sc := StoreConfig{ - Settings: st, - AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, - Clock: clock, + Settings: st, + AmbientCtx: log.AmbientContext{Tracer: st.Tracer}, + Clock: clock, CoalescedHeartbeatsInterval: 50 * time.Millisecond, RaftHeartbeatIntervalTicks: 1, ScanInterval: 10 * time.Minute, diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index f0030c46ad15..099e197b73c4 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -435,9 +435,10 @@ func (sp *StorePool) decommissioningReplicas( return } -// AvailableNodeCount returns the number of nodes which are -// considered available for use as allocation targets. This includes -// only live nodes which are not decommissioning. +// AvailableNodeCount returns the number of nodes which are considered +// available for use as allocation targets. This includes only nodes which are +// not dead or decommissioning. It notably does include nodes that are not +// considered live by node liveness but are also not yet considered dead. func (sp *StorePool) AvailableNodeCount() int { sp.detailsMu.RLock() defer sp.detailsMu.RUnlock() @@ -447,11 +448,14 @@ func (sp *StorePool) AvailableNodeCount() int { timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) for _, detail := range sp.detailsMu.storeDetails { + if detail.desc == nil { + continue + } switch s := detail.status(now, timeUntilStoreDead, 0, sp.nodeLivenessFn); s { - case storeStatusThrottled, storeStatusAvailable: + case storeStatusThrottled, storeStatusAvailable, storeStatusUnknown, storeStatusReplicaCorrupted: availableNodes[detail.desc.Node.NodeID] = struct{}{} - case storeStatusReplicaCorrupted, storeStatusDead, storeStatusUnknown, storeStatusDecommissioning: - // Do nothing; this node cannot be used. + case storeStatusDead, storeStatusDecommissioning: + // Do nothing; this node can't/shouldn't have any replicas on it. default: panic(fmt.Sprintf("unknown store status: %d", s)) }