From 5973a126dd74e6091afa2f39f2e2160ca3d1feac Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 19 Sep 2016 17:30:44 -0400 Subject: [PATCH] storage: transfer leases on overfull stores If the remove-target is the lease holder, transfer the lease to another store to allow removal of the replica from the overfull store. Fixes #9462. --- storage/allocator.go | 54 ++++++++++++++++++++++++++++++++++++ storage/allocator_test.go | 52 ++++++++++++++++++++++++++++++++++ storage/balancer.go | 23 +++++++++------ storage/client_split_test.go | 10 +++++-- storage/replicate_queue.go | 30 ++++++++++++++------ 5 files changed, 150 insertions(+), 19 deletions(-) diff --git a/storage/allocator.go b/storage/allocator.go index 5df8c731d8c9..fdca8e718903 100644 --- a/storage/allocator.go +++ b/storage/allocator.go @@ -20,6 +20,7 @@ package storage import ( "fmt" + "math" "math/rand" "golang.org/x/net/context" @@ -317,6 +318,50 @@ func (a Allocator) RebalanceTarget( return a.improve(sl, existingNodes) } +// TransferLeaseTarget returns a suitable replica to transfer the range lease +// to from the provided list. It excludes the current lease holder replica. +func (a *Allocator) TransferLeaseTarget( + existing []roachpb.ReplicaDescriptor, + leaseStoreID roachpb.StoreID, +) roachpb.ReplicaDescriptor { + bestIdx := -1 + bestRangeCount := int32(math.MaxInt32) + for i, repl := range existing { + if leaseStoreID == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue + } + if bestRangeCount > storeDesc.Capacity.RangeCount { + bestRangeCount = storeDesc.Capacity.RangeCount + bestIdx = i + } + } + if bestIdx == -1 { + return roachpb.ReplicaDescriptor{} + } + return existing[bestIdx] +} + +// TransferLeaseSource returns true if the specified store is overfull with +// respect to the other stores matching the specified attributes. +func (a *Allocator) TransferLeaseSource( + required roachpb.Attributes, + leaseStoreID roachpb.StoreID, +) bool { + storeDesc, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return false + } + sl, _, _ := a.storePool.getStoreList(required, a.options.Deterministic) + if log.V(3) { + log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl) + } + return a.overfull(storeDesc, sl) +} + // ShouldRebalance returns whether the specified store should attempt to // rebalance a replica to another store. // @@ -370,6 +415,15 @@ func (a Allocator) shouldRebalance( return rcb.shouldRebalance(store, sl) } +// overfull returns whether the specified store is overfull with respect to the +// given candidate store list. +func (a Allocator) overfull( + store roachpb.StoreDescriptor, sl StoreList, +) bool { + rcb := rangeCountBalancer{a.randGen} + return rcb.overfull(store, sl) +} + // computeQuorum computes the quorum value for the given number of nodes. func computeQuorum(nodes int) int { return (nodes / 2) + 1 diff --git a/storage/allocator_test.go b/storage/allocator_test.go index 794711617359..f17af59f2e41 100644 --- a/storage/allocator_test.go +++ b/storage/allocator_test.go @@ -714,6 +714,58 @@ func TestAllocatorRemoveTarget(t *testing.T) { } } +func TestAllocatorTransferLeaseTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator() + defer stopper.Stop() + + // 3 stores where the range count for each store is equal to the store + // ID. This makes the store ID equivalent to the preference for that store as + // a lease transfer target. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{RangeCount: int32(i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + makeReplicaDescs := func(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor { + var res []roachpb.ReplicaDescriptor + for _, id := range storeIDs { + res = append(res, roachpb.ReplicaDescriptor{ + StoreID: id, + }) + } + return res + } + + testCases := []struct { + existing []roachpb.ReplicaDescriptor + leaseStoreID roachpb.StoreID + expectedTarget roachpb.StoreID + }{ + // No existing lease holder, prefer lease loaded replica. + {makeReplicaDescs(1, 2, 3), 0, 1}, + // No existing lease holder, prefer lease loaded replica (existing order + // doesn't matter). + {makeReplicaDescs(3, 2, 1), 0, 1}, + // Store 1 is the lease holder. + {makeReplicaDescs(1, 2, 3), 1, 2}, + // Store 2 is the lease holder. + {makeReplicaDescs(1, 2, 3), 2, 1}, + } + for i, c := range testCases { + target := a.TransferLeaseTarget(c.existing, c.leaseStoreID) + if c.expectedTarget != target.StoreID { + t.Fatalf("%d: expected %d, but found %d", i, c.expectedTarget, target.StoreID) + } + } +} + func TestAllocatorComputeAction(t *testing.T) { defer leaktest.AfterTest(t)() stopper, _, sp, a, _ := createTestAllocator() diff --git a/storage/balancer.go b/storage/balancer.go index 24f2c9760abe..2870106ec120 100644 --- a/storage/balancer.go +++ b/storage/balancer.go @@ -155,10 +155,7 @@ func (rcb rangeCountBalancer) shouldRebalance( // for rebalancing. This is currently utilized by tests. maxCapacityUsed := store.Capacity.FractionUsed() >= maxFractionUsedThreshold - // Rebalance if we're above the rebalance target, which is - // mean*(1+rebalanceThreshold). - target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold))) - rangeCountAboveTarget := store.Capacity.RangeCount > target + rebalanceFromOverfullStore := rcb.overfull(store, sl) // Rebalance if the candidate store has a range count above the mean, and // there exists another store that is underfull: its range count is smaller @@ -182,18 +179,28 @@ func (rcb rangeCountBalancer) shouldRebalance( math.Abs(float64(store.Capacity.RangeCount)-sl.candidateCount.mean)) shouldRebalance := - (maxCapacityUsed || rangeCountAboveTarget || rebalanceToUnderfullStore) && rebalanceConvergesOnMean + (maxCapacityUsed || rebalanceFromOverfullStore || rebalanceToUnderfullStore) && + rebalanceConvergesOnMean if log.V(2) { log.Infof(context.TODO(), "%d: should-rebalance=%t: fraction-used=%.2f range-count=%d "+ - "(mean=%.1f, target=%d, fraction-used=%t, above-target=%t, underfull=%t, converges=%t)", + "(mean=%.1f, fraction-used=%t, overfull=%t, underfull=%t, converges=%t)", store.StoreID, shouldRebalance, store.Capacity.FractionUsed(), - store.Capacity.RangeCount, sl.candidateCount.mean, target, - maxCapacityUsed, rangeCountAboveTarget, rebalanceToUnderfullStore, rebalanceConvergesOnMean) + store.Capacity.RangeCount, sl.candidateCount.mean, maxCapacityUsed, + rebalanceFromOverfullStore, rebalanceToUnderfullStore, rebalanceConvergesOnMean) } return shouldRebalance } +func (rcb rangeCountBalancer) overfull( + store roachpb.StoreDescriptor, sl StoreList, +) bool { + // Rebalance if we're above the rebalance target, which is + // mean*(1+rebalanceThreshold). + overfullThreshold := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold))) + return store.Capacity.RangeCount > overfullThreshold +} + // selectRandom chooses up to count random store descriptors from the given // store list, excluding any stores that are too full to accept more replicas. func selectRandom( diff --git a/storage/client_split_test.go b/storage/client_split_test.go index 29240131c697..fe731dc0e962 100644 --- a/storage/client_split_test.go +++ b/storage/client_split_test.go @@ -696,7 +696,10 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { const maxBytes = 1 << 16 // Set max bytes. descID := uint32(keys.MaxReservedDescID + 1) - config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes}) + config.TestingSetZoneConfig(descID, config.ZoneConfig{ + ReplicaAttrs: []roachpb.Attributes{{}}, + RangeMaxBytes: maxBytes, + }) // Trigger gossip callback. if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil { @@ -753,7 +756,10 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { // Set max bytes. const maxBytes = 1 << 16 descID := uint32(keys.MaxReservedDescID + 1) - config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes}) + config.TestingSetZoneConfig(descID, config.ZoneConfig{ + ReplicaAttrs: []roachpb.Attributes{{}}, + RangeMaxBytes: maxBytes, + }) // Trigger gossip callback. if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil { diff --git a/storage/replicate_queue.go b/storage/replicate_queue.go index 58ba30a9b5a5..7f9faf1063fa 100644 --- a/storage/replicate_queue.go +++ b/storage/replicate_queue.go @@ -160,19 +160,31 @@ func (rq *replicateQueue) process( } case AllocatorRemove: log.Event(ctx, "removing a replica") - // We require the lease in order to process replicas, so - // repl.store.StoreID() corresponds to the lease-holder's store ID. - removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, repl.store.StoreID()) - if err != nil { - return err + // If the lease holder is on an overfull store allow transferring the lease + // away. + leaseHolderID := repl.store.StoreID() + if rq.allocator.TransferLeaseSource(zone.ReplicaAttrs[0], leaseHolderID) { + leaseHolderID = 0 } - log.VEventf(1, ctx, "removing replica %+v due to over-replication", removeReplica) - if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, leaseHolderID) + if err != nil { return err } - // Do not requeue if we removed ourselves. if removeReplica.StoreID == repl.store.StoreID() { - return nil + target := rq.allocator.TransferLeaseTarget(desc.Replicas, repl.store.StoreID()) + if target.StoreID != 0 { + log.VEventf(1, ctx, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease", repl) + } + // Do not requeue as we transferred our lease away. + return nil + } + } else { + log.VEventf(1, ctx, "removing replica %+v due to over-replication", removeReplica) + if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + return err + } } case AllocatorRemoveDead: log.Event(ctx, "removing a dead replica")