Skip to content

Commit

Permalink
storage: remove TestStoreRangeRebalance
Browse files Browse the repository at this point in the history
Add the check that preemptive snapshots are being used to
TestStoreRangeUpReplicate. Add TestReplicateQueueRebalance for testing
that basic rebalancing is working.

Fixes #10193
Fixes #10156
Fixes #9395
  • Loading branch information
petermattis committed Nov 8, 2016
1 parent 375eee3 commit 954dd90
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 163 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
for i := range stores {
stores[i].rangeCount = mean
}
surplus := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1))
surplus := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1))
stores[0].rangeCount += surplus
stores[0].shouldRebalanceFrom = true
for i := 1; i < len(stores); i++ {
Expand All @@ -537,7 +537,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
// Subtract enough ranges from the first store to make it a suitable
// rebalance target. To maintain the specified mean, we then add that delta
// back to the rest of the replicas.
deficit := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1))
deficit := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1))
stores[0].rangeCount -= deficit
for i := 1; i < len(stores); i++ {
stores[i].rangeCount += int32(math.Ceil(float64(deficit) / float64(len(stores)-1)))
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (rcb rangeCountBalancer) improve(sl StoreList, excluded nodeIDSet) *roachpb
return candidate
}

// RebalanceThreshold is the minimum ratio of a store's range surplus to the
// rebalanceThreshold is the minimum ratio of a store's range surplus to the
// mean range count that permits rebalances away from that store.
var RebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05)
var rebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05)

func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl StoreList) bool {
// TODO(peter,bram,cuong): The FractionUsed check seems suspicious. When a
Expand All @@ -164,15 +164,15 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor

// Rebalance if we're above the rebalance target, which is
// mean*(1+RebalanceThreshold).
target := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold)))
target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold)))
rangeCountAboveTarget := store.Capacity.RangeCount > target

// Rebalance if the candidate store has a range count above the mean, and
// there exists another store that is underfull: its range count is smaller
// than mean*(1-RebalanceThreshold).
var rebalanceToUnderfullStore bool
if float64(store.Capacity.RangeCount) > sl.candidateCount.mean {
underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - RebalanceThreshold)))
underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - rebalanceThreshold)))
for _, desc := range sl.stores {
if desc.Capacity.RangeCount < underfullThreshold {
rebalanceToUnderfullStore = true
Expand Down
150 changes: 23 additions & 127 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
"github.com/pkg/errors"
"golang.org/x/net/context"

Expand Down Expand Up @@ -957,6 +956,29 @@ func TestStoreRangeUpReplicate(t *testing.T) {
}
return nil
})

var generated int64
var normalApplied int64
var preemptiveApplied int64
for _, s := range mtc.stores {
m := s.Metrics()
generated += m.RangeSnapshotsGenerated.Count()
normalApplied += m.RangeSnapshotsNormalApplied.Count()
preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
}
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}

if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}
}

// TestStoreRangeCorruptionChangeReplicas verifies that the replication queue
Expand Down Expand Up @@ -2038,132 +2060,6 @@ func TestStoreRangeRemoveDead(t *testing.T) {
}
}

// TestStoreRangeRebalance verifies that the replication queue will take
// rebalancing opportunities and add a new replica on another store.
func TestStoreRangeRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(peter,bram): clean up this test so this isn't required and unexport
// storage.RebalanceThreshold.
defer func(threshold float64) {
storage.RebalanceThreshold = threshold
}(storage.RebalanceThreshold)
storage.RebalanceThreshold = 0

// Start multiTestContext with replica rebalancing enabled.
sc := storage.TestStoreConfig(nil)
sc.AllocatorOptions = storage.AllocatorOptions{AllowRebalance: true}
mtc := &multiTestContext{storeConfig: &sc}

mtc.Start(t, 6)
defer mtc.Stop()

splitKey := roachpb.Key("split")
splitArgs := adminSplitArgs(roachpb.KeyMin, splitKey)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), &splitArgs); err != nil {
t.Fatal(err)
}

// The setup for this test is to have two ranges like so:
// s1:r1, s2:r1r2, s3:r1, s4:r2, s5:r2, s6:-
// and to rebalance range 1 away from store 2 to store 6:
// s1:r1, s2:r2, s3:r1, s4:r2, s5:r2, s6:r1

replica1 := mtc.stores[0].LookupReplica(roachpb.RKeyMin, nil)
mtc.replicateRange(replica1.Desc().RangeID, 1, 2)

replica2Key := roachpb.RKey(splitKey)
replica2 := mtc.stores[0].LookupReplica(replica2Key, nil)
mtc.replicateRange(replica2.Desc().RangeID, 1, 3, 4)
mtc.unreplicateRange(replica2.Desc().RangeID, 0)

countReplicas := func() map[roachpb.StoreID]int {
counts := make(map[roachpb.StoreID]int)
rangeDescA := getRangeMetadata(roachpb.RKeyMin, mtc, t)
for _, repl := range rangeDescA.Replicas {
counts[repl.StoreID]++
}
rangeDescB := getRangeMetadata(replica2Key, mtc, t)
for _, repl := range rangeDescB.Replicas {
counts[repl.StoreID]++
}
return counts
}

// Check the initial conditions.
expectedStart := map[roachpb.StoreID]int{
roachpb.StoreID(1): 1,
roachpb.StoreID(2): 2,
roachpb.StoreID(3): 1,
roachpb.StoreID(4): 1,
roachpb.StoreID(5): 1,
}
actualStart := countReplicas()
if !reflect.DeepEqual(expectedStart, actualStart) {
t.Fatalf("replicas are not distributed as expected %s", pretty.Diff(expectedStart, actualStart))
}

expected := map[roachpb.StoreID]int{
roachpb.StoreID(1): 1,
roachpb.StoreID(2): 1,
roachpb.StoreID(3): 1,
roachpb.StoreID(4): 1,
roachpb.StoreID(5): 1,
roachpb.StoreID(6): 1,
}

mtc.initGossipNetwork()
util.SucceedsSoon(t, func() error {
// As of this writing, replicas which hold their range's lease cannot
// be removed; forcefully transfer the lease for range 1 to another
// store to allow store 2's replica to be removed.
if err := mtc.transferLease(replica1.RangeID, mtc.stores[0]); err != nil {
t.Fatal(err)
}

// It takes at least two passes to achieve the final result. In the
// first pass, we add the replica to store 6. In the second pass, we
// remove the replica from store 2. Note that it can also take some time
// for the snapshot to arrive.
mtc.stores[0].ForceReplicationScanAndProcess()

// Gossip the stores so that the store pools are up to date. Note that
// there might be a delay between the call below and the asynchronous
// update of the store pools.
mtc.gossipStores()

// Exit when all stores have a single replica.
actual := countReplicas()
if !reflect.DeepEqual(expected, actual) {
return errors.Errorf("replicas are not distributed as expected %s", pretty.Diff(expected, actual))
}
return nil
})

var generated int64
var normalApplied int64
var preemptiveApplied int64
for _, s := range mtc.stores {
m := s.Metrics()
generated += m.RangeSnapshotsGenerated.Count()
normalApplied += m.RangeSnapshotsNormalApplied.Count()
preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
if n := s.ReservationCount(); n != 0 {
t.Fatalf("expected 0 reservations, but found %d", n)
}
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}

if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}
}

// TestReplicateRogueRemovedNode ensures that a rogue removed node
// (i.e. a node that has been removed from the range but doesn't know
// it yet because it was down or partitioned away when it happened)
Expand Down
30 changes: 0 additions & 30 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,36 +1053,6 @@ func (m *multiTestContext) getRaftLeader(rangeID roachpb.RangeID) *storage.Repli
return raftLeaderRepl
}

// transferLease moves the lease for the specified rangeID to the destination
// store. The destination store must have a replica of the range on it.
func (m *multiTestContext) transferLease(rangeID roachpb.RangeID, destStore *storage.Store) error {
destReplica, err := destStore.GetReplica(rangeID)
if err != nil {
return err
}
origLeasePtr, _ := destReplica.GetLease()
if origLeasePtr == nil {
return errors.Errorf("could not get lease ptr from replica %s", destReplica)
}
originalStoreID := origLeasePtr.Replica.StoreID

// Get the replica that currently holds the lease.
var origStore *storage.Store
for _, store := range m.stores {
if store.Ident.StoreID == originalStoreID {
origStore = store
break
}
}

origRepl, err := origStore.GetReplica(destReplica.RangeID)
if err != nil {
return err
}

return origRepl.AdminTransferLease(destStore.Ident.StoreID)
}

// getArgs returns a GetRequest and GetResponse pair addressed to
// the default replica for the specified key.
func getArgs(key roachpb.Key) roachpb.GetRequest {
Expand Down
109 changes: 109 additions & 0 deletions pkg/storage/replicate_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Peter Mattis

package storage_test

import (
"os"
"testing"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

func TestReplicateQueueRebalance(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the gossip stores interval lower to speed up rebalancing. With the
// default of 5s we have to wait ~5s for the rebalancing to start.
if err := os.Setenv("COCKROACH_GOSSIP_STORES_INTERVAL", "100ms"); err != nil {
t.Fatal(err)
}
defer func() {
if err := os.Unsetenv("COCKROACH_GOSSIP_STORES_INTERVAL"); err != nil {
t.Fatal(err)
}
}()

// TODO(peter): Bump this to 10 nodes. Doing so is flaky until we have lease
// rebalancing because store 1 can hold on to too many replicas. Consider:
//
// [15 4 2 3 3 5 5 0 5 5]
//
// Store 1 is holding all of the leases so we can't rebalance away from
// it. Every other store has within the ceil(average-replicas) threshold. So
// there are no rebalancing opportunities for store 8.
tc := testcluster.StartTestCluster(t, 5,
base.TestClusterArgs{ReplicationMode: base.ReplicationAuto},
)
defer tc.Stopper().Stop()

// Create a handful of ranges. Along with the initial ranges in the cluster,
// this will result in 15 total ranges and 45 total replicas. Spread across
// the 10 nodes in the cluster the average is 4.5 replicas per node. Note
// that we don't expect to achieve that perfect balance as rebalancing
// targets a threshold around the average.
for i := 0; i < 10; i++ {
tableID := keys.MaxReservedDescID + i + 1
splitKey := keys.MakeRowSentinelKey(keys.MakeTablePrefix(uint32(tableID)))
for {
if _, _, err := tc.SplitRange(splitKey); err != nil {
if testutils.IsError(err, "split at key .* failed: conflict updating range descriptors") {
continue
}
t.Fatal(err)
}
break
}
}

countReplicas := func() []int {
counts := make([]int, len(tc.Servers))
for _, s := range tc.Servers {
err := s.Stores().VisitStores(func(s *storage.Store) error {
counts[s.StoreID()-1] += s.ReplicaCount()
return nil
})
if err != nil {
t.Fatal(err)
}
}
return counts
}

util.SucceedsSoon(t, func() error {
counts := countReplicas()
for _, c := range counts {
// TODO(peter): This is a weak check for rebalancing. When lease
// rebalancing is in place we can make this somewhat more robust.
if c == 0 {
err := errors.Errorf("not balanced: %d", counts)
log.Info(context.Background(), err)
return err
}
}
return nil
})
}

0 comments on commit 954dd90

Please sign in to comment.