diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 98316c6afaa0..82ba633e26e9 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -518,7 +518,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { for i := range stores { stores[i].rangeCount = mean } - surplus := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1)) + surplus := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1)) stores[0].rangeCount += surplus stores[0].shouldRebalanceFrom = true for i := 1; i < len(stores); i++ { @@ -537,7 +537,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { // Subtract enough ranges from the first store to make it a suitable // rebalance target. To maintain the specified mean, we then add that delta // back to the rest of the replicas. - deficit := int32(math.Ceil(float64(mean)*RebalanceThreshold + 1)) + deficit := int32(math.Ceil(float64(mean)*rebalanceThreshold + 1)) stores[0].rangeCount -= deficit for i := 1; i < len(stores); i++ { stores[i].rangeCount += int32(math.Ceil(float64(deficit) / float64(len(stores)-1))) diff --git a/pkg/storage/balancer.go b/pkg/storage/balancer.go index ca8d1b93f54d..dad577d53129 100644 --- a/pkg/storage/balancer.go +++ b/pkg/storage/balancer.go @@ -152,9 +152,9 @@ func (rcb rangeCountBalancer) improve(sl StoreList, excluded nodeIDSet) *roachpb return candidate } -// RebalanceThreshold is the minimum ratio of a store's range surplus to the +// rebalanceThreshold is the minimum ratio of a store's range surplus to the // mean range count that permits rebalances away from that store. -var RebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05) +var rebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05) func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl StoreList) bool { // TODO(peter,bram,cuong): The FractionUsed check seems suspicious. When a @@ -164,7 +164,7 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor // Rebalance if we're above the rebalance target, which is // mean*(1+RebalanceThreshold). - target := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold))) + target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold))) rangeCountAboveTarget := store.Capacity.RangeCount > target // Rebalance if the candidate store has a range count above the mean, and @@ -172,7 +172,7 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor // than mean*(1-RebalanceThreshold). var rebalanceToUnderfullStore bool if float64(store.Capacity.RangeCount) > sl.candidateCount.mean { - underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - RebalanceThreshold))) + underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - rebalanceThreshold))) for _, desc := range sl.stores { if desc.Capacity.RangeCount < underfullThreshold { rebalanceToUnderfullStore = true diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index fdeb92e145b2..9bb54e66641c 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -28,7 +28,6 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/kr/pretty" "github.com/pkg/errors" "golang.org/x/net/context" @@ -957,6 +956,29 @@ func TestStoreRangeUpReplicate(t *testing.T) { } return nil }) + + var generated int64 + var normalApplied int64 + var preemptiveApplied int64 + for _, s := range mtc.stores { + m := s.Metrics() + generated += m.RangeSnapshotsGenerated.Count() + normalApplied += m.RangeSnapshotsNormalApplied.Count() + preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count() + if n := s.ReservationCount(); n != 0 { + t.Fatalf("expected 0 reservations, but found %d", n) + } + } + if generated == 0 { + t.Fatalf("expected at least 1 snapshot, but found 0") + } + + if normalApplied != 0 { + t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied) + } + if generated != preemptiveApplied { + t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied) + } } // TestStoreRangeCorruptionChangeReplicas verifies that the replication queue @@ -2038,132 +2060,6 @@ func TestStoreRangeRemoveDead(t *testing.T) { } } -// TestStoreRangeRebalance verifies that the replication queue will take -// rebalancing opportunities and add a new replica on another store. -func TestStoreRangeRebalance(t *testing.T) { - defer leaktest.AfterTest(t)() - - // TODO(peter,bram): clean up this test so this isn't required and unexport - // storage.RebalanceThreshold. - defer func(threshold float64) { - storage.RebalanceThreshold = threshold - }(storage.RebalanceThreshold) - storage.RebalanceThreshold = 0 - - // Start multiTestContext with replica rebalancing enabled. - sc := storage.TestStoreConfig(nil) - sc.AllocatorOptions = storage.AllocatorOptions{AllowRebalance: true} - mtc := &multiTestContext{storeConfig: &sc} - - mtc.Start(t, 6) - defer mtc.Stop() - - splitKey := roachpb.Key("split") - splitArgs := adminSplitArgs(roachpb.KeyMin, splitKey) - if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), &splitArgs); err != nil { - t.Fatal(err) - } - - // The setup for this test is to have two ranges like so: - // s1:r1, s2:r1r2, s3:r1, s4:r2, s5:r2, s6:- - // and to rebalance range 1 away from store 2 to store 6: - // s1:r1, s2:r2, s3:r1, s4:r2, s5:r2, s6:r1 - - replica1 := mtc.stores[0].LookupReplica(roachpb.RKeyMin, nil) - mtc.replicateRange(replica1.Desc().RangeID, 1, 2) - - replica2Key := roachpb.RKey(splitKey) - replica2 := mtc.stores[0].LookupReplica(replica2Key, nil) - mtc.replicateRange(replica2.Desc().RangeID, 1, 3, 4) - mtc.unreplicateRange(replica2.Desc().RangeID, 0) - - countReplicas := func() map[roachpb.StoreID]int { - counts := make(map[roachpb.StoreID]int) - rangeDescA := getRangeMetadata(roachpb.RKeyMin, mtc, t) - for _, repl := range rangeDescA.Replicas { - counts[repl.StoreID]++ - } - rangeDescB := getRangeMetadata(replica2Key, mtc, t) - for _, repl := range rangeDescB.Replicas { - counts[repl.StoreID]++ - } - return counts - } - - // Check the initial conditions. - expectedStart := map[roachpb.StoreID]int{ - roachpb.StoreID(1): 1, - roachpb.StoreID(2): 2, - roachpb.StoreID(3): 1, - roachpb.StoreID(4): 1, - roachpb.StoreID(5): 1, - } - actualStart := countReplicas() - if !reflect.DeepEqual(expectedStart, actualStart) { - t.Fatalf("replicas are not distributed as expected %s", pretty.Diff(expectedStart, actualStart)) - } - - expected := map[roachpb.StoreID]int{ - roachpb.StoreID(1): 1, - roachpb.StoreID(2): 1, - roachpb.StoreID(3): 1, - roachpb.StoreID(4): 1, - roachpb.StoreID(5): 1, - roachpb.StoreID(6): 1, - } - - mtc.initGossipNetwork() - util.SucceedsSoon(t, func() error { - // As of this writing, replicas which hold their range's lease cannot - // be removed; forcefully transfer the lease for range 1 to another - // store to allow store 2's replica to be removed. - if err := mtc.transferLease(replica1.RangeID, mtc.stores[0]); err != nil { - t.Fatal(err) - } - - // It takes at least two passes to achieve the final result. In the - // first pass, we add the replica to store 6. In the second pass, we - // remove the replica from store 2. Note that it can also take some time - // for the snapshot to arrive. - mtc.stores[0].ForceReplicationScanAndProcess() - - // Gossip the stores so that the store pools are up to date. Note that - // there might be a delay between the call below and the asynchronous - // update of the store pools. - mtc.gossipStores() - - // Exit when all stores have a single replica. - actual := countReplicas() - if !reflect.DeepEqual(expected, actual) { - return errors.Errorf("replicas are not distributed as expected %s", pretty.Diff(expected, actual)) - } - return nil - }) - - var generated int64 - var normalApplied int64 - var preemptiveApplied int64 - for _, s := range mtc.stores { - m := s.Metrics() - generated += m.RangeSnapshotsGenerated.Count() - normalApplied += m.RangeSnapshotsNormalApplied.Count() - preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count() - if n := s.ReservationCount(); n != 0 { - t.Fatalf("expected 0 reservations, but found %d", n) - } - } - if generated == 0 { - t.Fatalf("expected at least 1 snapshot, but found 0") - } - - if normalApplied != 0 { - t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied) - } - if generated != preemptiveApplied { - t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied) - } -} - // TestReplicateRogueRemovedNode ensures that a rogue removed node // (i.e. a node that has been removed from the range but doesn't know // it yet because it was down or partitioned away when it happened) diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 80acc528051e..1be49843eb02 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -1053,36 +1053,6 @@ func (m *multiTestContext) getRaftLeader(rangeID roachpb.RangeID) *storage.Repli return raftLeaderRepl } -// transferLease moves the lease for the specified rangeID to the destination -// store. The destination store must have a replica of the range on it. -func (m *multiTestContext) transferLease(rangeID roachpb.RangeID, destStore *storage.Store) error { - destReplica, err := destStore.GetReplica(rangeID) - if err != nil { - return err - } - origLeasePtr, _ := destReplica.GetLease() - if origLeasePtr == nil { - return errors.Errorf("could not get lease ptr from replica %s", destReplica) - } - originalStoreID := origLeasePtr.Replica.StoreID - - // Get the replica that currently holds the lease. - var origStore *storage.Store - for _, store := range m.stores { - if store.Ident.StoreID == originalStoreID { - origStore = store - break - } - } - - origRepl, err := origStore.GetReplica(destReplica.RangeID) - if err != nil { - return err - } - - return origRepl.AdminTransferLease(destStore.Ident.StoreID) -} - // getArgs returns a GetRequest and GetResponse pair addressed to // the default replica for the specified key. func getArgs(key roachpb.Key) roachpb.GetRequest { diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go new file mode 100644 index 000000000000..9ec317cc5d1c --- /dev/null +++ b/pkg/storage/replicate_queue_test.go @@ -0,0 +1,99 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Peter Mattis + +package storage_test + +import ( + "os" + "testing" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" +) + +func TestReplicateQueueRebalance(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Set the gossip stores interval lower to speed up rebalancing. With the + // default of 5s we have to wait ~5s for the rebalancing to start. + if err := os.Setenv("COCKROACH_GOSSIP_STORES_INTERVAL", "100ms"); err != nil { + t.Fatal(err) + } + defer func() { + if err := os.Unsetenv("COCKROACH_GOSSIP_STORES_INTERVAL"); err != nil { + t.Fatal(err) + } + }() + + // TODO(peter): Bump this to 10 nodes. Doing so is flaky (we encounter a + // situation where 1 node never receives a replica). Need to figure out why. + tc := testcluster.StartTestCluster(t, 5, + base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, + ) + defer tc.Stopper().Stop() + + // Create a handful of ranges. Along with the initial ranges in the cluster, + // this will result in 15 total ranges and 45 total replicas. Spread across + // the 10 nodes in the cluster the average is 4.5 replicas per node. + for i := 0; i < 10; i++ { + tableID := keys.MaxReservedDescID + i + 1 + splitKey := keys.MakeRowSentinelKey(keys.MakeTablePrefix(uint32(tableID))) + for { + if _, _, err := tc.SplitRange(splitKey); err != nil { + if testutils.IsError(err, "split at key .* failed: conflict updating range descriptors") { + continue + } + t.Fatal(err) + } + break + } + } + + countReplicas := func() []int { + counts := make([]int, len(tc.Servers)) + for i, s := range tc.Servers { + err := s.Stores().VisitStores(func(s *storage.Store) error { + counts[i] += s.ReplicaCount() + return nil + }) + if err != nil { + t.Fatal(err) + } + } + return counts + } + + util.SucceedsSoon(t, func() error { + counts := countReplicas() + for _, c := range counts { + if c == 0 { + err := errors.Errorf("not balanced: %d", counts) + log.Info(context.Background(), err) + return err + } + } + return nil + }) +}