Skip to content

Commit

Permalink
[prototype] storage: Make rebalance decisions at store-level
Browse files Browse the repository at this point in the history
As outlined in recent comments on cockroachdb#26059, we need to bring back some
form of stats-based rebalancing in order to perform well on TPC-C
without manual partitioning and replica placement.

This commit contains a prototype that demonstrates the effectiveness of
changing our approach to making rebalancing decisions from making them
in the replicate queue, which operates on arbitrarily ordered replicas
of the ranges on a store, to making them at a higher-level. This
prototype makes them at a cluster level by running the logic on only one
node, but my real proposal is to make them at the store level.

This change in abstraction reflects what a human would do if asked to
even out the load on a cluster given perfect information about
everything happening in the cluster:

1. First, determine which stores have the most load on them (or overfull
   -- but for the prototype I only considered the one dimension that
   affects TPC-C the most)
2. Decide whether the most loaded stores are so overloaded that action
   needs to be taken.
3. Examine the hottest replicas on the store (maybe not the absolute
   hottest in practice, since moving that one could disrupt user traffic,
   but in the prototype this seems to work fine) and attempt to move them
   to under-utilized stores.  If this can be done simply by transferring
   leases to under-utilized stores, then do so. If moving leases isn't
   enough, then also rebalance replicas from the hottest store to
   under-utilized stores.
4. Repeat periodically to handle changes in load or cluster membership.

In a real versino of this code, the plan is roughly:
1. Each store will independently run their own control loop like this
   that is only responsible for moving leases/replicas off itself, not off
   other stores. This avoids needing a centralized coordinator, and will
   avoid the need to use the raft debug endpoint as long as we start
   gossiping QPS per store info, since the store already has details about
   the replicas on itself.
2. The existing replicate queue will stop making decisions motivated by
   balance. It will switch to only making decisions based on
   constraints/diversity/lease preferences, which is still needed since
   the new store-level logic will only check for store-level balance,
   not that all replicas' constraints are properly met.
3. The new code will have to avoid violating constraints/diversity/lease
   preferences.
4. The new code should consider range count, disk fullness, and maybe
   writes per second as well.
5. In order to avoid making decisions based on bad data, I'd like to
   extend lease transfers to pass along QPS data to the new leaseholder
   and preemptive snapshots to pass along WPS data to the new replica.
   This may not be strictly necessary, as shown by the success of this
   prototype, but should make for more reliable decision making.

I tested this out on TPC-C 5k on 15 nodes and am able to consistently
get 94% efficiency, which is the max I've seen using a build of the
workload generator that erroneously includes the ramp-up period in its
final stats. The first run with this code only got 85% because it took a
couple minutes to make all the lease transfers it wanted, but then all
subsequent runs got the peak efficiency while making negligibly few
lease transfers.

Note that I didn't even have to implement replica rebalancing to get
these results, which oddly contradicts my previous claims. However, I
believe that's because I did the initial split/scatter using a binary
containing cockroachdb#26438, so the replicas were already better scattered than by
default. I ran TPC-C on that build without these changes a couple times,
though, and didn't get better than 65% efficiency, so the scatter wasn't
the cause of the good results here.

Touches cockroachdb#26059, cockroachdb#17979

Release note: None
  • Loading branch information
a-robinson committed Jun 11, 2018
1 parent dde686a commit 2aa46ad
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<tr><td><code>server.remote_debugging.mode</code></td><td>string</td><td><code>local</code></td><td>set to enable remote debugging, localhost-only or disable (any, local, off)</td></tr>
<tr><td><code>server.shutdown.drain_wait</code></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with the rest of the shutdown process</td></tr>
<tr><td><code>server.shutdown.query_wait</code></td><td>duration</td><td><code>10s</code></td><td>the server will wait for at least this amount of time for active queries to finish</td></tr>
<tr><td><code>server.test_qps_threshold</code></td><td>float</td><td><code>1.5E-01</code></td><td>the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in</td></tr>
<tr><td><code>server.time_until_store_dead</code></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>1</code></td><td>Default distributed SQL execution mode [off = 0, auto = 1, on = 2]</td></tr>
Expand Down
295 changes: 295 additions & 0 deletions pkg/server/prototype_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package server

import (
"container/heap"
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

var testQPSThreshold = settings.RegisterNonNegativeFloatSetting(
"server.test_qps_threshold",
"the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in",
0.25,
)

func (s *Server) RunStoreLevelAllocator(ctx context.Context) {
if s.NodeID() != 1 {
return
}

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
// Wait out the first tick before doing anything since the store is still
// starting up and we might as well wait for some qps/wps stats to
// accumulate.
select {
case <-s.stopper.ShouldQuiesce():
return
case <-ticker.C:
}

/*
stores, _, _ := s.storePool.GetPublicStoreList()
if !needRebalance(stores) {
continue
}
*/

log.Infof(ctx, "starting prototype allocator loop")

resp, err := s.status.RaftDebug(ctx, &serverpb.RaftDebugRequest{})
if err != nil {
log.Errorf(ctx, "failed to retrieve raft debug info: %s", err)
continue
}

//sortedStoreQPS, hottestRanges := processResponse(resp)
//if len(sortedStoreQPS) == 0 {
qpsPerStore, hottestRangesByStore := processResponse(resp)
if len(qpsPerStore) == 0 {
log.Infof(ctx, "received no stores to process: %+v", resp)
continue
}

log.Infof(ctx, "qpsPerStore: %v", qpsPerStore)

var avgQPS float64
//for _, store := range sortedStoreQPS {
//avgQPS += store.qps
//}
//avgQPS /= len(sortedStoreQPS)
for _, qps := range qpsPerStore {
avgQPS += qps
}
avgQPS /= float64(len(qpsPerStore))
upperBound := math.Max(avgQPS*1.15, avgQPS+100)
log.Infof(ctx, "avgQPS: %f, upperBound: %f", avgQPS, upperBound)

// TODO: Also consider trying to move work to under-utilized stores even
// if there aren't any outliers at the top end.
topLevelLoop:
for {
// Try to lessen the load on the hottest store.
hottestStore, hottestQPS := findHottestStore(qpsPerStore)
log.Infof(ctx, "hottestStore: s%d, hottestQPS: %f", hottestStore, hottestQPS)
if hottestQPS <= upperBound {
break topLevelLoop
}

hottestRanges := hottestRangesByStore[hottestStore]
var rangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs)

// First check if there are any leases we can reasonably move.
for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)
for j := range r.Nodes {
storeID := r.Nodes[j].Range.SourceStoreID
// Transfer the lease if we can move it to a store that will still be
// under the average per-store QPS.
if qpsPerStore[storeID]+qps < avgQPS {
// Attempt to transfer the lease, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID])
if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil {
log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err)
continue topLevelLoop
}
qpsPerStore[storeID] += qps
qpsPerStore[hottestStore] -= qps
continue topLevelLoop
}
}
}

log.Infof(ctx, "failed to find a store to transfer a lease to")
break topLevelLoop

/*
// If that didn't work out, then resort to rebalancing replicas.
log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances")
hottestRanges := hottestRangesByStore[hottestStore]
var rangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest remaining rangeIDs: %v", rangeIDs)
for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)
for j := range r.Nodes {
}
}
// TODO
//storage.TestingRelocateRange(ctx, s.db, rangeDesc, targets)
*/
}
}
}

func findHottestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID, float64) {
var storeID roachpb.StoreID
var qps float64
for s, q := range qpsPerStore {
if q > qps {
storeID = s
qps = q
}
}
return storeID, qps
}

func findColdestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID, float64) {
var storeID roachpb.StoreID
qps := math.MaxFloat64
for s, q := range qpsPerStore {
if q < qps {
storeID = s
qps = q
}
}
return storeID, qps
}

/*
type storeQPS struct {
storeID roachpb.StoreID
qps float64
}
*/

func processResponse(
resp *serverpb.RaftDebugResponse,
) (map[roachpb.StoreID]float64, map[roachpb.StoreID][]*serverpb.RaftRangeStatus) {
qpsPerStore := make(map[roachpb.StoreID]float64)
hottestRangeQueues := make(map[roachpb.StoreID]*PriorityQueue)
for _, r := range resp.Ranges {
r := r
lease, qps := leaseAndQPS(&r)
qpsPerStore[lease] += qps
pq := hottestRangeQueues[lease]
if pq == nil {
pq = &PriorityQueue{}
heap.Init(pq)
hottestRangeQueues[lease] = pq
}
heap.Push(pq, &r)
if pq.Len() > 32 {
heap.Pop(pq)
}
}

/*
sortedQPS := make([]storeQPS, len(qpsPerStore), 0)
for storeID, qps := range qpsPerStore {
sortedQPS = append(sortedQPS, storeQPS{storeID, qps})
}
sort.Slice(sortedQPS, func(i, j int) bool { return sortedQPS[i].qps < sortedQPS[j].qps })
*/

hottestRanges := make(map[roachpb.StoreID][]*serverpb.RaftRangeStatus)
for storeID, pq := range hottestRangeQueues {
length := pq.Len()
hottestRanges[storeID] = make([]*serverpb.RaftRangeStatus, length)
rangeQPS := make([]float64, length)
for i := 1; i <= length; i++ {
hottestRanges[storeID][length-i] = heap.Pop(pq).(*serverpb.RaftRangeStatus)
rangeQPS[length-i] = qps(hottestRanges[storeID][length-i])
}
log.Infof(context.TODO(), "hottest ranges for s%d: %v", storeID, rangeQPS)
}

return qpsPerStore, hottestRanges
}

func qps(r *serverpb.RaftRangeStatus) float64 {
_, qps := leaseAndQPS(r)
return qps
}

func leaseAndQPS(r *serverpb.RaftRangeStatus) (roachpb.StoreID, float64) {
for i := range r.Nodes {
if r.Nodes[i].Range.State.ReplicaState.Lease.Replica.StoreID == r.Nodes[i].Range.SourceStoreID {
return r.Nodes[i].Range.SourceStoreID, r.Nodes[i].Range.Stats.QueriesPerSecond
}
}
return 0, 0
}

type PriorityQueue []*serverpb.RaftRangeStatus

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
return qps(pq[i]) < qps(pq[j])
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*serverpb.RaftRangeStatus)
*pq = append(*pq, item)
}

func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}

/*
func needRebalance(stores storage.PublicStoreList) bool {
for _, desc := range stores.Stores {
if rangeCountNeedsRebalance(desc.Capacity.RangeCount, stores.avgRangeCount) {
return true
}
if diskUsageNeedsRebalance(desc.Capacity.RangeCount, stores.avgRangeCount) {
return true
}
if wpsNeedsRebalance(desc.Capacity.RangeCount, stores.avgRangeCount) {
return true
}
}
return false
}
func rangeCountNeedsRebalance(rangeCount, avgRangeCount float64) bool {
upperBound := math.Max(avgRangeCount*1.1, avgRangeCount+10)
lowerBound := math.Min(avgRangeCount*0.9, avgRangeCount-10)
return rangeCount > upperBound || rangeCount < lowerBound
}
*/
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,8 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
}
}

s.stopper.RunWorker(ctx, s.RunStoreLevelAllocator)

log.Event(ctx, "server ready")

return nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,15 @@ func (sp *StorePool) getNodeLocalityString(nodeID roachpb.NodeID) string {
}
return locality.str
}

type PublicStoreList struct {
Stores []roachpb.StoreDescriptor
AvgRanges float64
AvgLeases float64
AvgLogicalBytes float64
AvgWritesPerSecond float64
}

func (sp *StorePool) GetPublicStoreList() (StoreList, int, int) {
return sp.getStoreList(0, storeFilterNone)
}
2 changes: 1 addition & 1 deletion pkg/util/mon/bytes_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (mm *BytesMonitor) reserveBytes(ctx context.Context, x int64) error {
// limit the amount of log messages when a size blowup is caused by
// many small allocations.
if bits.Len64(uint64(mm.mu.curAllocated)) != bits.Len64(uint64(mm.mu.curAllocated-x)) {
log.Infof(ctx, "%s: bytes usage increases to %s (+%d)",
log.VEventf(ctx, 3, "%s: bytes usage increases to %s (+%d)",
mm.name,
humanizeutil.IBytes(mm.mu.curAllocated), x)
}
Expand Down

0 comments on commit 2aa46ad

Please sign in to comment.