Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: transfer leases on overfull stores #9465

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 77 additions & 51 deletions pkg/roachpb/metadata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/roachpb/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ message StoreCapacity {
optional int64 capacity = 1 [(gogoproto.nullable) = false];
optional int64 available = 2 [(gogoproto.nullable) = false];
optional int32 range_count = 3 [(gogoproto.nullable) = false];
optional int32 lease_holder_count = 4 [(gogoproto.nullable) = false];
}

// NodeDescriptor holds details on node physical/network topology.
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -685,6 +686,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {

// Apply zone configuration to database and check again.
dbZone := config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMinBytes: 456,
}
setZone(dbZone, idPath[1])
Expand All @@ -693,6 +695,7 @@ func TestAdminAPIZoneDetails(t *testing.T) {

// Apply zone configuration to table and check again.
tblZone := config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMinBytes: 789,
}
setZone(tblZone, idPath[2])
Expand Down
66 changes: 66 additions & 0 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package storage

import (
"fmt"
"math"
"math/rand"

"golang.org/x/net/context"
Expand Down Expand Up @@ -329,6 +330,71 @@ func (a Allocator) RebalanceTarget(
return a.improve(sl, existingNodes)
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica.
func (a *Allocator) TransferLeaseTarget(
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
) roachpb.ReplicaDescriptor {
var bestDesc roachpb.ReplicaDescriptor
bestLeaseCount := int32(math.MaxInt32)
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if !ok {
continue
}
if bestLeaseCount > storeDesc.Capacity.LeaseHolderCount {
bestLeaseCount = storeDesc.Capacity.LeaseHolderCount
bestDesc = repl
}
}
return bestDesc
}

// TransferLeaseSource returns true if the specified store is overfull with
// respect to the other stores matching the specified attributes.
func (a *Allocator) TransferLeaseSource(
required config.Constraints,
leaseStoreID roachpb.StoreID,
) bool {
store, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return false
}
sl, _, _ := a.storePool.getStoreList(required, a.options.Deterministic)
if log.V(3) {
log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl)
}

// Allow lease transfer if we're above the overfull threshold, which is
// mean*(1+rebalanceThreshold).
overfullCountThreshold := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold)))
if store.Capacity.RangeCount <= overfullCountThreshold {
return false
}
overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + RebalanceThreshold)))
return store.Capacity.LeaseHolderCount > overfullLeaseThreshold
}

// ShouldRebalance returns whether the specified store should attempt to
// rebalance a replica to another store.
//
// TODO(bram): This is only used by the simulator and should be removed.
func (a *Allocator) ShouldRebalance(storeID roachpb.StoreID) bool {
if !a.options.AllowRebalance {
return false
}
desc, ok := a.storePool.getStoreDescriptor(storeID)
if !ok {
return false
}
sl, _, _ := a.storePool.getStoreList(config.Constraints{}, true)
return a.shouldRebalance(desc, sl)
}

// selectGood attempts to select a store from the supplied store list that it
// considers to be 'Good' relative to the other stores in the list. Any nodes
// in the supplied 'exclude' list will be disqualified from selection. Returns
Expand Down
52 changes: 52 additions & 0 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,58 @@ func TestAllocatorRemoveTarget(t *testing.T) {
}
}

func TestAllocatorTransferLeaseTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator()
defer stopper.Stop()

// 3 stores where the range count for each store is equal to the store
// ID. This makes the store ID equivalent to the preference for that store as
// a lease transfer target.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{RangeCount: int32(i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

makeReplicaDescs := func(storeIDs ...roachpb.StoreID) []roachpb.ReplicaDescriptor {
var res []roachpb.ReplicaDescriptor
for _, id := range storeIDs {
res = append(res, roachpb.ReplicaDescriptor{
StoreID: id,
})
}
return res
}

testCases := []struct {
existing []roachpb.ReplicaDescriptor
leaseStoreID roachpb.StoreID
expectedTarget roachpb.StoreID
}{
// No existing lease holder, prefer the least loaded replica.
{makeReplicaDescs(1, 2, 3), 0, 1},
// No existing lease holder, prefer the least loaded replica (existing
// order doesn't matter).
{makeReplicaDescs(3, 2, 1), 0, 1},
// Store 1 is the lease holder.
{makeReplicaDescs(1, 2, 3), 1, 2},
// Store 2 is the lease holder.
{makeReplicaDescs(1, 2, 3), 2, 1},
}
for i, c := range testCases {
target := a.TransferLeaseTarget(c.existing, c.leaseStoreID)
if c.expectedTarget != target.StoreID {
t.Fatalf("%d: expected %d, but found %d", i, c.expectedTarget, target.StoreID)
}
}
}

func TestAllocatorComputeAction(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, _, sp, a, _ := createTestAllocator()
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor
maxCapacityUsed := store.Capacity.FractionUsed() >= maxFractionUsedThreshold

// Rebalance if we're above the rebalance target, which is
// mean*(1+RebalanceThreshold).
target := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold)))
rangeCountAboveTarget := store.Capacity.RangeCount > target
// mean*(1+rebalanceThreshold).
overfullThreshold := int32(math.Ceil(sl.candidateCount.mean * (1 + RebalanceThreshold)))
rebalanceFromOverfullStore := store.Capacity.RangeCount > overfullThreshold

// Rebalance if the candidate store has a range count above the mean, and
// there exists another store that is underfull: its range count is smaller
Expand All @@ -175,14 +175,15 @@ func (rangeCountBalancer) shouldRebalance(store roachpb.StoreDescriptor, sl Stor
rebalanceConvergesOnMean := float64(store.Capacity.RangeCount) > sl.candidateCount.mean+0.5

shouldRebalance :=
(maxCapacityUsed || rangeCountAboveTarget || rebalanceToUnderfullStore) && rebalanceConvergesOnMean
(maxCapacityUsed || rebalanceFromOverfullStore || rebalanceToUnderfullStore) &&
rebalanceConvergesOnMean
if log.V(2) {
log.Infof(context.TODO(),
"%d: should-rebalance=%t: fraction-used=%.2f range-count=%d "+
"(mean=%.1f, target=%d, fraction-used=%t, above-target=%t, underfull=%t, converges=%t)",
"(mean=%.1f, fraction-used=%t, overfull=%t, underfull=%t, converges=%t)",
store.StoreID, shouldRebalance, store.Capacity.FractionUsed(),
store.Capacity.RangeCount, sl.candidateCount.mean, target,
maxCapacityUsed, rangeCountAboveTarget, rebalanceToUnderfullStore, rebalanceConvergesOnMean)
store.Capacity.RangeCount, sl.candidateCount.mean, maxCapacityUsed,
rebalanceFromOverfullStore, rebalanceToUnderfullStore, rebalanceConvergesOnMean)
}
return shouldRebalance
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
const maxBytes = 1 << 16
// Set max bytes.
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes})
config.TestingSetZoneConfig(descID, config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMaxBytes: maxBytes,
})

// Trigger gossip callback.
if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil {
Expand Down Expand Up @@ -754,7 +757,10 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
// Set max bytes.
const maxBytes = 1 << 16
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, config.ZoneConfig{RangeMaxBytes: maxBytes})
config.TestingSetZoneConfig(descID, config.ZoneConfig{
ReplicaAttrs: []roachpb.Attributes{{}},
RangeMaxBytes: maxBytes,
})

// Trigger gossip callback.
if err := store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfig{}, 0); err != nil {
Expand Down
Loading