Skip to content

Commit

Permalink
storage: transfer leases on overfull stores
Browse files Browse the repository at this point in the history
If the remove-target is the lease holder, transfer the lease to another
store to allow removal of the replica from the overfull store.

Fixes #9462.
  • Loading branch information
petermattis committed Sep 20, 2016
1 parent d899c39 commit 5973a12
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 19 deletions.
54 changes: 54 additions & 0 deletions storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package storage

import (
"fmt"
"math"
"math/rand"

"golang.org/x/net/context"
Expand Down Expand Up @@ -317,6 +318,50 @@ func (a Allocator) RebalanceTarget(
return a.improve(sl, existingNodes)
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica.
func (a *Allocator) TransferLeaseTarget(
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
) roachpb.ReplicaDescriptor {
bestIdx := -1
bestRangeCount := int32(math.MaxInt32)
for i, repl := range existing {
if leaseStoreID == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if !ok {
continue
}
if bestRangeCount > storeDesc.Capacity.RangeCount {
bestRangeCount = storeDesc.Capacity.RangeCount
bestIdx = i
}
}
if bestIdx == -1 {
return roachpb.ReplicaDescriptor{}
}
return existing[bestIdx]
}

// TransferLeaseSource returns true if the specified store is overfull with
// respect to the other stores matching the specified attributes.
func (a *Allocator) TransferLeaseSource(
required roachpb.Attributes,
leaseStoreID roachpb.StoreID,
) bool {
storeDesc, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return false
}
sl, _, _ := a.storePool.getStoreList(required, a.options.Deterministic)
if log.V(3) {
log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl)
}
return a.overfull(storeDesc, sl)
}

// ShouldRebalance returns whether the specified store should attempt to
// rebalance a replica to another store.
//
Expand Down Expand Up @@ -370,6 +415,15 @@ func (a Allocator) shouldRebalance(
return rcb.shouldRebalance(store, sl)
}

// overfull returns whether the specified store is overfull with respect to the
// given candidate store list.
func (a Allocator) overfull(
store roachpb.StoreDescriptor, sl StoreList,
) bool {
rcb := rangeCountBalancer{a.randGen}
return rcb.overfull(store, sl)
}

// computeQuorum computes the quorum value for the given number of nodes.
func computeQuorum(nodes int) int {
return (nodes / 2) + 1
Expand Down
52 changes: 52 additions & 0 deletions storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,58 @@ func TestAllocatorRemoveTarget(t *testing.T) {
}
}

func TestAllocatorTransferLeaseTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator()
defer stopper.Stop()

// 3 stores where the range count for each store is equal to the store
// ID. This makes the store ID equivalent to the preference for that store as
// a lease transfer target.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{RangeCount: int32(i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

makeReplicaDescs := func(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor {
var res []roachpb.ReplicaDescriptor
for _, id := range storeIDs {
res = append(res, roachpb.ReplicaDescriptor{
StoreID: id,
})
}
return res
}

testCases := []struct {
existing []roachpb.ReplicaDescriptor
leaseStoreID roachpb.StoreID
expectedTarget roachpb.StoreID
}{
// No existing lease holder, prefer lease loaded replica.
{makeReplicaDescs(1, 2, 3), 0, 1},
// No existing lease holder, prefer lease loaded replica (existing order
// doesn't matter).
{makeReplicaDescs(3, 2, 1), 0, 1},
// Store 1 is the lease holder.
{makeReplicaDescs(1, 2, 3), 1, 2},
// Store 2 is the lease holder.
{makeReplicaDescs(1, 2, 3), 2, 1},
}
for i, c := range testCases {
target := a.TransferLeaseTarget(c.existing, c.leaseStoreID)
if c.expectedTarget != target.StoreID {
t.Fatalf("%d: expected %d, but found %d", i, c.expectedTarget, target.StoreID)
}
}
}

func TestAllocatorComputeAction(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, _, sp, a, _ := createTestAllocator()
Expand Down
23 changes: 15 additions & 8 deletions storage/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ func (rcb rangeCountBalancer) shouldRebalance(
// for rebalancing. This is currently utilized by tests.
maxCapacityUsed := store.Capacity.FractionUsed() >= maxFractionUsedThreshold

// Rebalance if we're above the rebalance target, which is
// mean*(1+rebalanceThreshold).
target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold)))
rangeCountAboveTarget := store.Capacity.RangeCount > target
rebalanceFromOverfullStore := rcb.overfull(store, sl)

// Rebalance if the candidate store has a range count above the mean, and
// there exists another store that is underfull: its range count is smaller
Expand All @@ -182,18 +179,28 @@ func (rcb rangeCountBalancer) shouldRebalance(
math.Abs(float64(store.Capacity.RangeCount)-sl.candidateCount.mean))

shouldRebalance :=
(maxCapacityUsed || rangeCountAboveTarget || rebalanceToUnderfullStore) && rebalanceConvergesOnMean
(maxCapacityUsed || rebalanceFromOverfullStore || rebalanceToUnderfullStore) &&
rebalanceConvergesOnMean
if log.V(2) {
log.Infof(context.TODO(),
"%d: should-rebalance=%t: fraction-used=%.2f range-count=%d "+
"(mean=%.1f, target=%d, fraction-used=%t, above-target=%t, underfull=%t, converges=%t)",
"(mean=%.1f, fraction-used=%t, overfull=%t, underfull=%t, converges=%t)",
store.StoreID, shouldRebalance, store.Capacity.FractionUsed(),
store.Capacity.RangeCount, sl.candidateCount.mean, target,
maxCapacityUsed, rangeCountAboveTarget, rebalanceToUnderfullStore, rebalanceConvergesOnMean)
store.Capacity.RangeCount, sl.candidateCount.mean, maxCapacityUsed,
rebalanceFromOverfullStore, rebalanceToUnderfullStore, rebalanceConvergesOnMean)
}
return shouldRebalance
}

func (rcb rangeCountBalancer) overfull(
store roachpb.StoreDescriptor, sl StoreList,
) bool {
// Rebalance if we're above the rebalance target, which is
// mean*(1+rebalanceThreshold).
overfullThreshold := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold)))
return store.Capacity.RangeCount > overfullThreshold
}

// selectRandom chooses up to count random store descriptors from the given
// store list, excluding any stores that are too full to accept more replicas.
func selectRandom(
Expand Down
10 changes: 8 additions & 2 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,10 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
const maxBytes = 1 << 16
// Set max bytes.
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes})
config.TestingSetZoneConfig(descID, config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMaxBytes: maxBytes,
})

// Trigger gossip callback.
if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil {
Expand Down Expand Up @@ -753,7 +756,10 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
// Set max bytes.
const maxBytes = 1 << 16
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes})
config.TestingSetZoneConfig(descID, config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMaxBytes: maxBytes,
})

// Trigger gossip callback.
if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil {
Expand Down
30 changes: 21 additions & 9 deletions storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,31 @@ func (rq *replicateQueue) process(
}
case AllocatorRemove:
log.Event(ctx, "removing a replica")
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, repl.store.StoreID())
if err != nil {
return err
// If the lease holder is on an overfull store allow transferring the lease
// away.
leaseHolderID := repl.store.StoreID()
if rq.allocator.TransferLeaseSource(zone.ReplicaAttrs[0], leaseHolderID) {
leaseHolderID = 0
}
log.VEventf(1, ctx, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, leaseHolderID)
if err != nil {
return err
}
// Do not requeue if we removed ourselves.
if removeReplica.StoreID == repl.store.StoreID() {
return nil
target := rq.allocator.TransferLeaseTarget(desc.Replicas, repl.store.StoreID())
if target.StoreID != 0 {
log.VEventf(1, ctx, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease", repl)
}
// Do not requeue as we transferred our lease away.
return nil
}
} else {
log.VEventf(1, ctx, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
return err
}
}
case AllocatorRemoveDead:
log.Event(ctx, "removing a dead replica")
Expand Down

0 comments on commit 5973a12

Please sign in to comment.