Skip to content

Commit

Permalink
Merge pull request #10515 from petermattis/pmattis/test-store-range-r…
Browse files Browse the repository at this point in the history
…ebalance

storage: rewrite TestStoreRangeRebalance
  • Loading branch information
petermattis authored Nov 8, 2016
2 parents 2bde1cc + f920b33 commit 24dac05
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 163 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
for i := range stores {
stores[i].rangeCount = mean
}
surplus := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1))
surplus := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1))
stores[0].rangeCount += surplus
stores[0].shouldRebalanceFrom = true
for i := 1; i < len(stores); i++ {
Expand All @@ -537,7 +537,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
// Subtract enough ranges from the first store to make it a suitable
// rebalance target. To maintain the specified mean, we then add that delta
// back to the rest of the replicas.
deficit := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1))
deficit := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1))
stores[0].rangeCount -= deficit
for i := 1; i < len(stores); i++ {
stores[i].rangeCount += int32(math.Ceil(float64(deficit) / float64(len(stores)-1)))
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (rcb rangeCountBalancer) improve(sl StoreList, excluded nodeIDSet) *roachpb
return candidate
}

// RebalanceThreshold is the minimum ratio of a store's range surplus to the
// rebalanceThreshold is the minimum ratio of a store's range surplus to the
// mean range count that permits rebalances away from that store.
var RebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05)
var rebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05)

func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl StoreList) bool {
// TODO(peter,bram,cuong): The FractionUsed check seems suspicious. When a
Expand All @@ -164,15 +164,15 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor

// Rebalance if we're above the rebalance target, which is
// mean*(1+RebalanceThreshold).
target := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold)))
target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold)))
rangeCountAboveTarget := store.Capacity.RangeCount > target

// Rebalance if the candidate store has a range count above the mean, and
// there exists another store that is underfull: its range count is smaller
// than mean*(1-RebalanceThreshold).
var rebalanceToUnderfullStore bool
if float64(store.Capacity.RangeCount) > sl.candidateCount.mean {
underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - RebalanceThreshold)))
underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - rebalanceThreshold)))
for _, desc := range sl.stores {
if desc.Capacity.RangeCount < underfullThreshold {
rebalanceToUnderfullStore = true
Expand Down
150 changes: 23 additions & 127 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
"github.com/pkg/errors"
"golang.org/x/net/context"

Expand Down Expand Up @@ -957,6 +956,29 @@ func TestStoreRangeUpReplicate(t *testing.T) {
}
return nil
})

var generated int64
var normalApplied int64
var preemptiveApplied int64
for _, s := range mtc.stores {
m := s.Metrics()
generated += m.RangeSnapshotsGenerated.Count()
normalApplied += m.RangeSnapshotsNormalApplied.Count()
preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
}
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}

if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}
}

// TestStoreRangeCorruptionChangeReplicas verifies that the replication queue
Expand Down Expand Up @@ -2038,132 +2060,6 @@ func TestStoreRangeRemoveDead(t *testing.T) {
}
}

// TestStoreRangeRebalance verifies that the replication queue will take
// rebalancing opportunities and add a new replica on another store.
func TestStoreRangeRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(peter,bram): clean up this test so this isn't required and unexport
// storage.RebalanceThreshold.
defer func(threshold float64) {
storage.RebalanceThreshold = threshold
}(storage.RebalanceThreshold)
storage.RebalanceThreshold = 0

// Start multiTestContext with replica rebalancing enabled.
sc := storage.TestStoreConfig(nil)
sc.AllocatorOptions = storage.AllocatorOptions{AllowRebalance: true}
mtc := &multiTestContext{storeConfig: &sc}

mtc.Start(t, 6)
defer mtc.Stop()

splitKey := roachpb.Key("split")
splitArgs := adminSplitArgs(roachpb.KeyMin, splitKey)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), &splitArgs); err != nil {
t.Fatal(err)
}

// The setup for this test is to have two ranges like so:
// s1:r1, s2:r1r2, s3:r1, s4:r2, s5:r2, s6:-
// and to rebalance range 1 away from store 2 to store 6:
// s1:r1, s2:r2, s3:r1, s4:r2, s5:r2, s6:r1

replica1 := mtc.stores[0].LookupReplica(roachpb.RKeyMin, nil)
mtc.replicateRange(replica1.Desc().RangeID, 1, 2)

replica2Key := roachpb.RKey(splitKey)
replica2 := mtc.stores[0].LookupReplica(replica2Key, nil)
mtc.replicateRange(replica2.Desc().RangeID, 1, 3, 4)
mtc.unreplicateRange(replica2.Desc().RangeID, 0)

countReplicas := func() map[roachpb.StoreID]int {
counts := make(map[roachpb.StoreID]int)
rangeDescA := getRangeMetadata(roachpb.RKeyMin, mtc, t)
for _, repl := range rangeDescA.Replicas {
counts[repl.StoreID]++
}
rangeDescB := getRangeMetadata(replica2Key, mtc, t)
for _, repl := range rangeDescB.Replicas {
counts[repl.StoreID]++
}
return counts
}

// Check the initial conditions.
expectedStart := map[roachpb.StoreID]int{
roachpb.StoreID(1): 1,
roachpb.StoreID(2): 2,
roachpb.StoreID(3): 1,
roachpb.StoreID(4): 1,
roachpb.StoreID(5): 1,
}
actualStart := countReplicas()
if !reflect.DeepEqual(expectedStart, actualStart) {
t.Fatalf("replicas are not distributed as expected %s", pretty.Diff(expectedStart, actualStart))
}

expected := map[roachpb.StoreID]int{
roachpb.StoreID(1): 1,
roachpb.StoreID(2): 1,
roachpb.StoreID(3): 1,
roachpb.StoreID(4): 1,
roachpb.StoreID(5): 1,
roachpb.StoreID(6): 1,
}

mtc.initGossipNetwork()
util.SucceedsSoon(t, func() error {
// As of this writing, replicas which hold their range's lease cannot
// be removed; forcefully transfer the lease for range 1 to another
// store to allow store 2's replica to be removed.
if err := mtc.transferLease(replica1.RangeID, mtc.stores[0]); err != nil {
t.Fatal(err)
}

// It takes at least two passes to achieve the final result. In the
// first pass, we add the replica to store 6. In the second pass, we
// remove the replica from store 2. Note that it can also take some time
// for the snapshot to arrive.
mtc.stores[0].ForceReplicationScanAndProcess()

// Gossip the stores so that the store pools are up to date. Note that
// there might be a delay between the call below and the asynchronous
// update of the store pools.
mtc.gossipStores()

// Exit when all stores have a single replica.
actual := countReplicas()
if !reflect.DeepEqual(expected, actual) {
return errors.Errorf("replicas are not distributed as expected %s", pretty.Diff(expected, actual))
}
return nil
})

var generated int64
var normalApplied int64
var preemptiveApplied int64
for _, s := range mtc.stores {
m := s.Metrics()
generated += m.RangeSnapshotsGenerated.Count()
normalApplied += m.RangeSnapshotsNormalApplied.Count()
preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
}
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}

if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}
}

// TestReplicateRogueRemovedNode ensures that a rogue removed node
// (i.e. a node that has been removed from the range but doesn't know
// it yet because it was down or partitioned away when it happened)
Expand Down
30 changes: 0 additions & 30 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,36 +1053,6 @@ func (m *multiTestContext) getRaftLeader(rangeID roachpb.RangeID) *storage.Repli
return raftLeaderRepl
}

// transferLease moves the lease for the specified rangeID to the destination
// store. The destination store must have a replica of the range on it.
func (m *multiTestContext) transferLease(rangeID roachpb.RangeID, destStore *storage.Store) error {
destReplica, err := destStore.GetReplica(rangeID)
if err != nil {
return err
}
origLeasePtr, _ := destReplica.GetLease()
if origLeasePtr == nil {
return errors.Errorf("could not get lease ptr from replica %s", destReplica)
}
originalStoreID := origLeasePtr.Replica.StoreID

// Get the replica that currently holds the lease.
var origStore *storage.Store
for _, store := range m.stores {
if store.Ident.StoreID == originalStoreID {
origStore = store
break
}
}

origRepl, err := origStore.GetReplica(destReplica.RangeID)
if err != nil {
return err
}

return origRepl.AdminTransferLease(destStore.Ident.StoreID)
}

// getArgs returns a GetRequest and GetResponse pair addressed to
// the default replica for the specified key.
func getArgs(key roachpb.Key) roachpb.GetRequest {
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/replicate_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Peter Mattis

package storage_test

import (
"os"
"testing"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

func TestReplicateQueueRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the gossip stores interval lower to speed up rebalancing. With the
// default of 5s we have to wait ~5s for the rebalancing to start.
if err := os.Setenv("COCKROACH_GOSSIP_STORES_INTERVAL", "100ms"); err != nil {
t.Fatal(err)
}
defer func() {
if err := os.Unsetenv("COCKROACH_GOSSIP_STORES_INTERVAL"); err != nil {
t.Fatal(err)
}
}()

// TODO(peter): Bump this to 10 nodes. Doing so is flaky until we have lease
// rebalancing because store 1 can hold on to too many replicas. Consider:
//
// [15 4 2 3 3 5 5 0 5 5]
//
// Store 1 is holding all of the leases so we can't rebalance away from
// it. Every other store has within the ceil(average-replicas) threshold. So
// there are no rebalancing opportunities for store 8.
tc := testcluster.StartTestCluster(t, 5,
base.TestClusterArgs{ReplicationMode: base.ReplicationAuto},
)
defer tc.Stopper().Stop()

// Create a handful of ranges. Along with the initial ranges in the cluster,
// this will result in 15 total ranges and 45 total replicas. Spread across
// the 10 nodes in the cluster the average is 4.5 replicas per node. Note
// that we don't expect to achieve that perfect balance as rebalancing
// targets a threshold around the average.
for i := 0; i < 10; i++ {
tableID := keys.MaxReservedDescID + i + 1
splitKey := keys.MakeRowSentinelKey(keys.MakeTablePrefix(uint32(tableID)))
for {
if _, _, err := tc.SplitRange(splitKey); err != nil {
if testutils.IsError(err, "split at key .* failed: conflict updating range descriptors") ||
testutils.IsError(err, "range is already split at key") {
continue
}
t.Fatal(err)
}
break
}
}

countReplicas := func() []int {
counts := make([]int, len(tc.Servers))
for _, s := range tc.Servers {
err := s.Stores().VisitStores(func(s *storage.Store) error {
counts[s.StoreID()-1] += s.ReplicaCount()
return nil
})
if err != nil {
t.Fatal(err)
}
}
return counts
}

util.SucceedsSoon(t, func() error {
counts := countReplicas()
for _, c := range counts {
// TODO(peter): This is a weak check for rebalancing. When lease
// rebalancing is in place we can make this somewhat more robust.
if c == 0 {
err := errors.Errorf("not balanced: %d", counts)
log.Info(context.Background(), err)
return err
}
}
return nil
})
}

0 comments on commit 24dac05

Please sign in to comment.