diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ba9fda281ba9..39c06edbb04f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -184,12 +184,14 @@ type RangeInfo struct { } func rangeInfoForRepl(repl *Replica, desc *roachpb.RangeDescriptor) RangeInfo { - writesPerSecond, _ := repl.writeStats.avgQPS() - return RangeInfo{ - Desc: desc, - LogicalBytes: repl.GetMVCCStats().Total(), - WritesPerSecond: writesPerSecond, + info := RangeInfo{ + Desc: desc, + LogicalBytes: repl.GetMVCCStats().Total(), } + if writesPerSecond, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { + info.WritesPerSecond = writesPerSecond + } + return info } // Allocator tries to spread replicas as evenly as possible across the stores @@ -374,6 +376,25 @@ func (a *Allocator) AllocateTarget( } } +func (a Allocator) simulateRemoveTarget( + ctx context.Context, + targetStore roachpb.StoreID, + constraints config.Constraints, + candidates []roachpb.ReplicaDescriptor, + rangeInfo RangeInfo, +) (roachpb.ReplicaDescriptor, string, error) { + // Update statistics first + // TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines, + // but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue + // (with the main exceptions being Scatter and the status server's allocator debug endpoint). + // Try to make this interfere less with other callers. + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.ADD_REPLICA) + defer func() { + a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA) + }() + return a.RemoveTarget(ctx, constraints, candidates, rangeInfo) +} + // RemoveTarget returns a suitable replica to remove from the provided replica // set. It first attempts to randomly select a target from the set of stores // that have greater than the average number of replicas. Failing that, it @@ -444,7 +465,11 @@ func (a Allocator) RemoveTarget( // rebalance. This helps prevent a stampeding herd targeting an abnormally // under-utilized store. func (a Allocator) RebalanceTarget( - ctx context.Context, constraints config.Constraints, rangeInfo RangeInfo, filter storeFilter, + ctx context.Context, + constraints config.Constraints, + raftStatus *raft.Status, + rangeInfo RangeInfo, + filter storeFilter, ) (*roachpb.StoreDescriptor, string) { sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter) @@ -491,6 +516,37 @@ func (a Allocator) RebalanceTarget( if target == nil { return nil, "" } + // We could make a simulation here to verify whether we'll remove the target we'll rebalance to. + for len(candidates) > 0 { + if raftStatus == nil || raftStatus.Progress == nil { + break + } + newReplica := roachpb.ReplicaDescriptor{ + NodeID: target.store.Node.NodeID, + StoreID: target.store.StoreID, + ReplicaID: rangeInfo.Desc.NextReplicaID, + } + desc := *rangeInfo.Desc + desc.Replicas = append(desc.Replicas, newReplica) + rangeInfo.Desc = &desc + + replicaCandidates := simulateFilterUnremovableReplicas(raftStatus, desc.Replicas, newReplica.ReplicaID) + + removeReplica, _, err := a.simulateRemoveTarget(ctx, target.store.StoreID, constraints, replicaCandidates, rangeInfo) + if err != nil { + log.Warningf(ctx, "simulating RemoveTarget failed: %s", err) + return nil, "" + } + if removeReplica.StoreID != target.store.StoreID { + break + } + newTargets := candidates.removeCandidate(*target) + newTarget := newTargets.selectGood(a.randGen) + if newTarget == nil { + return nil, "" + } + target = newTarget + } details, err := json.Marshal(decisionDetails{ Target: target.String(), Existing: existingCandidates.String(), @@ -877,6 +933,16 @@ func filterBehindReplicas( return candidates } +func simulateFilterUnremovableReplicas( + raftStatus *raft.Status, + replicas []roachpb.ReplicaDescriptor, + brandNewReplicaID roachpb.ReplicaID, +) []roachpb.ReplicaDescriptor { + status := *raftStatus + status.Progress[uint64(brandNewReplicaID)] = raft.Progress{Match: 0} + return filterUnremovableReplicas(&status, replicas, brandNewReplicaID) +} + // filterUnremovableReplicas removes any unremovable replicas from the supplied // slice. An unremovable replicas is one which is a necessary part of the // quorum that will result from removing 1 replica. We forgive brandNewReplicaID diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 8d50b92f7f8d..2776cc3dcca1 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -285,6 +285,17 @@ func (cl candidateList) selectBad(randGen allocatorRand) *candidate { return worst } +// removeCandidate remove the specified candidate from candidateList. +func (cl candidateList) removeCandidate(c candidate) candidateList { + for i := 0; i < len(cl); i++ { + if cl[i].store.StoreID == c.store.StoreID { + cl = append(cl[:i], cl[i+1:]...) + break + } + } + return cl +} + // allocateCandidates creates a candidate list of all stores that can be used // for allocating a new replica ordered from the best to the worst. Only // stores that meet the criteria are included in the list. diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 3c91b3ba0fa5..2a9ef75e2254 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/util" @@ -657,6 +658,7 @@ func TestAllocatorRebalance(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: 3}}, firstRange), storeFilterThrottled, ) @@ -685,6 +687,136 @@ func TestAllocatorRebalance(t *testing.T) { } } +// TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance to a target that +// we'll immediately remove. +func TestAllocatorRebalanceTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + defer stopper.Stop(context.Background()) + // We make 4 stores in this test, store1 and store2 are in the same datacenter a, + // store3 in the datacenter b, store4 in the datacenter c. all of our replicas are + // distributed within these three datacenters. Originally, store4 has much more replicas + // than other stores. So if we didn't make the simulation in RebalanceTarget, it will + // try to choose store2 as the target store to make an rebalance. + // However, we'll immediately remove the replica on the store2 to guarantee the locality diversity. + // But after we make the simulation in RebalanceTarget, we could avoid that happen. + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 50, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "a"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 56, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "b"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 55, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "datacenter", Value: "c"}, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + RangeCount: 150, + }, + }, + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + st := a.storePool.st + EnableStatsBasedRebalancing.Override(&st.SV, false) + replicas := []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + }, + { + StoreID: 3, + NodeID: 3, + }, + { + StoreID: 4, + NodeID: 4, + }, + } + repl := &Replica{RangeID: firstRange} + + repl.mu.Lock() + repl.mu.state.Stats = &enginepb.MVCCStats{} + repl.mu.Unlock() + + rs := newReplicaStats(clock, nil) + repl.writeStats = rs + + desc := &roachpb.RangeDescriptor{ + Replicas: replicas, + RangeID: firstRange, + } + + rangeInfo := rangeInfoForRepl(repl, desc) + + status := &raft.Status{ + Progress: make(map[uint64]raft.Progress), + } + for _, replica := range replicas { + status.Progress[uint64(replica.NodeID)] = raft.Progress{ + Match: 10, + } + } + for i := 0; i < 10; i++ { + result, _ := a.RebalanceTarget( + context.Background(), + config.Constraints{}, + status, + rangeInfo, + storeFilterThrottled, + ) + if result != nil { + t.Errorf("expected no rebalance, but got %d.", result.StoreID) + } + } +} + func TestAllocatorRebalanceDeadNodes(t *testing.T) { defer leaktest.AfterTest(t)() @@ -754,7 +886,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { for _, c := range testCases { t.Run("", func(t *testing.T) { result, _ := a.RebalanceTarget( - ctx, config.Constraints{}, testRangeInfo(c.existing, firstRange), storeFilterThrottled) + ctx, config.Constraints{}, nil, testRangeInfo(c.existing, firstRange), storeFilterThrottled) if c.expected > 0 { if result == nil { t.Fatalf("expected %d, but found nil", c.expected) @@ -949,6 +1081,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { result, _ := a.RebalanceTarget( ctx, config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{StoreID: stores[0].StoreID}}, firstRange), storeFilterThrottled, ) @@ -2552,6 +2685,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { actual, _ := a.RebalanceTarget( ctx, constraints, + nil, testRangeInfo(existingReplicas, firstRange), storeFilterThrottled, ) @@ -2672,6 +2806,7 @@ func Example_rebalancing() { target, _ := alloc.RebalanceTarget( context.Background(), config.Constraints{}, + nil, testRangeInfo([]roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}, firstRange), storeFilterThrottled, ) diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 96d5da5b27ae..f930b95b54b5 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -184,7 +184,7 @@ func (rq *replicateQueue) shouldQueue( } if !rq.store.TestingKnobs().DisableReplicaRebalancing { - target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + target, _ := rq.allocator.RebalanceTarget(ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if target != nil { log.VEventf(ctx, 2, "rebalance target found, enqueuing") return true, 0 @@ -461,7 +461,7 @@ func (rq *replicateQueue) processOneChange( if !rq.store.TestingKnobs().DisableReplicaRebalancing { rebalanceStore, details := rq.allocator.RebalanceTarget( - ctx, zone.Constraints, rangeInfo, storeFilterThrottled) + ctx, zone.Constraints, repl.RaftStatus(), rangeInfo, storeFilterThrottled) if rebalanceStore == nil { log.VEventf(ctx, 1, "no suitable rebalance target") } else { @@ -574,7 +574,8 @@ func (rq *replicateQueue) addReplica( if err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil { return err } - rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.ADD_REPLICA) + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.ADD_REPLICA) return nil } @@ -593,7 +594,8 @@ func (rq *replicateQueue) removeReplica( if err := repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, target, desc, reason, details); err != nil { return err } - rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, repl, roachpb.REMOVE_REPLICA) + rangeInfo := rangeInfoForRepl(repl, desc) + rq.allocator.storePool.updateLocalStoreAfterRebalance(target.StoreID, rangeInfo, roachpb.REMOVE_REPLICA) return nil } diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index cf8c0f1a803e..702525b00d65 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -330,7 +330,7 @@ func (sp *StorePool) deadReplicasGossipUpdate(_ string, content roachpb.Value) { // updateLocalStoreAfterRebalance is used to update the local copy of the // target store immediately after a rebalance. func (sp *StorePool) updateLocalStoreAfterRebalance( - storeID roachpb.StoreID, repl *Replica, changeType roachpb.ReplicaChangeType, + storeID roachpb.StoreID, rangeInfo RangeInfo, changeType roachpb.ReplicaChangeType, ) { sp.detailsMu.Lock() defer sp.detailsMu.Unlock() @@ -343,23 +343,18 @@ func (sp *StorePool) updateLocalStoreAfterRebalance( } switch changeType { case roachpb.ADD_REPLICA: - detail.desc.Capacity.LogicalBytes += repl.GetMVCCStats().Total() - if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { - detail.desc.Capacity.WritesPerSecond += qps - } + detail.desc.Capacity.LogicalBytes += rangeInfo.LogicalBytes + detail.desc.Capacity.WritesPerSecond += rangeInfo.WritesPerSecond case roachpb.REMOVE_REPLICA: - total := repl.GetMVCCStats().Total() - if detail.desc.Capacity.LogicalBytes <= total { + if detail.desc.Capacity.LogicalBytes <= rangeInfo.LogicalBytes { detail.desc.Capacity.LogicalBytes = 0 } else { - detail.desc.Capacity.LogicalBytes -= total + detail.desc.Capacity.LogicalBytes -= rangeInfo.LogicalBytes } - if qps, dur := repl.writeStats.avgQPS(); dur >= MinStatsDuration { - if detail.desc.Capacity.WritesPerSecond <= qps { - detail.desc.Capacity.WritesPerSecond = 0 - } else { - detail.desc.Capacity.WritesPerSecond -= qps - } + if detail.desc.Capacity.WritesPerSecond <= rangeInfo.WritesPerSecond { + detail.desc.Capacity.WritesPerSecond = 0 + } else { + detail.desc.Capacity.WritesPerSecond -= rangeInfo.WritesPerSecond } } sp.detailsMu.storeDetails[storeID] = &detail diff --git a/pkg/storage/store_pool_test.go b/pkg/storage/store_pool_test.go index 1a35ef455592..a8239e7dca4b 100644 --- a/pkg/storage/store_pool_test.go +++ b/pkg/storage/store_pool_test.go @@ -354,21 +354,8 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { }, } sg.GossipStores(stores, t) - node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)} - eng := engine.NewInMem(roachpb.Attributes{}, 1<<20) - stopper.AddCloser(eng) - cfg := TestStoreConfig(clock) - cfg.Transport = NewDummyRaftTransport(cfg.Settings) - store := NewStore(cfg, eng, &node) - rg := roachpb.RangeDescriptor{ - RangeID: 1, - StartKey: roachpb.RKey([]byte("a")), - EndKey: roachpb.RKey([]byte("b")), - } - replica, err := NewReplica(&rg, store, roachpb.ReplicaID(0)) - if err != nil { - t.Fatalf("make replica error : %s", err) - } + + replica := &Replica{RangeID: 1} replica.mu.Lock() replica.mu.state.Stats = &enginepb.MVCCStats{ KeyBytes: 2, @@ -382,7 +369,13 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { manual.Increment(int64(MinStatsDuration + time.Second)) replica.writeStats = rs - sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), replica, roachpb.ADD_REPLICA) + rangeDesc := &roachpb.RangeDescriptor{ + RangeID: replica.RangeID, + } + + rangeInfo := rangeInfoForRepl(replica, rangeDesc) + + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(1), rangeInfo, roachpb.ADD_REPLICA) desc, ok := sp.getStoreDescriptor(roachpb.StoreID(1)) if !ok { t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 1) @@ -393,7 +386,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { expectedBytes, desc.Capacity.LogicalBytes, expectedQPS, desc.Capacity.WritesPerSecond) } - sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), replica, roachpb.REMOVE_REPLICA) + sp.updateLocalStoreAfterRebalance(roachpb.StoreID(2), rangeInfo, roachpb.REMOVE_REPLICA) desc, ok = sp.getStoreDescriptor(roachpb.StoreID(2)) if !ok { t.Fatalf("couldn't find StoreDescriptor for Store ID %d", 2) @@ -433,12 +426,14 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { t.Fatalf("make replica error : %s", err) } + rangeInfo := rangeInfoForRepl(replica, &rg) + // Update StorePool, which should be a no-op. storeID := roachpb.StoreID(1) if _, ok := sp.getStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor not gossiped, should not be found") } - sp.updateLocalStoreAfterRebalance(storeID, replica, roachpb.ADD_REPLICA) + sp.updateLocalStoreAfterRebalance(storeID, rangeInfo, roachpb.ADD_REPLICA) if _, ok := sp.getStoreDescriptor(storeID); ok { t.Fatalf("StoreDescriptor still not gossiped, should not be found") }