Skip to content

Commit

Permalink
Merge #80527 #80736
Browse files Browse the repository at this point in the history
80527: ccl/sqlproxyccl: periodically rebalance active and idle partitions r=JeffSwenson a=jaylim-crl

Previously, we were only transferring connections away from DRAINING pods.
This commit adds support for transferring connections away from overloaded
RUNNING pods. A pod is considered overloaded if the number of assignments to
it has exceeded the average number of assignments across all RUNNING pods by
15%. Note that active and idle partitions will be rebalanced independently.

Release note: None

80736: server: allow users with VIEWACTIVITY role access Hot ranges page r=koorosh a=koorosh

Initially, only users with admin role can access Hot ranges page
in Db Console.
Now, users with VIEWACTIVITY and VIEWACTIVITYREDACTED roles access
Hot Ranges page.

Release note: None

Resolves: #79953

Co-authored-by: Jay <[email protected]>
Co-authored-by: Andrii Vorobiov <[email protected]>
  • Loading branch information
3 people committed May 2, 2022
3 parents d058efa + 4142dfc + d3dad17 commit f4db230
Show file tree
Hide file tree
Showing 7 changed files with 925 additions and 115 deletions.
269 changes: 233 additions & 36 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"container/list"
"context"
"math"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
Expand All @@ -37,6 +38,28 @@ const (
// DRAINING state before the proxy starts moving connections away from it.
minDrainPeriod = 1 * time.Minute

// rebalancePercentDeviation defines the percentage threshold that the
// current number of assignments can deviate away from the mean. Having a
// 15% "deadzone" reduces frequent transfers especially when load is
// fluctuating.
//
// For example, if the percent deviation is 0.15, and mean is 10, the
// number of assignments for every pod has to be between [8, 12] to be
// considered balanced.
//
// NOTE: This must be between 0 and 1 inclusive.
rebalancePercentDeviation = 0.15

// rebalanceRate defines the rate of rebalancing assignments across SQL
// pods. This rate applies to both RUNNING and DRAINING pods. For example,
// consider the case where the rate is 0.50; if we have decided that we need
// to move 15 assignments away from a particular pod, only 7 pods will be
// moved at a time.
//
// NOTE: This must be between 0 and 1 inclusive. 0 means no rebalancing
// will occur.
rebalanceRate = 0.50

// defaultMaxConcurrentRebalances represents the maximum number of
// concurrent rebalance requests that are being processed. This effectively
// limits the number of concurrent transfers per proxy.
Expand Down Expand Up @@ -296,9 +319,9 @@ func (b *Balancer) rebalance(ctx context.Context) {
continue
}

// Build a podMap so we could easily retrieve the pod by address.
// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
hasRunningPod := false
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

Expand All @@ -316,45 +339,214 @@ func (b *Balancer) rebalance(ctx context.Context) {
continue
}

connMap := b.connTracker.GetConnsMap(tenantID)
for addr, podConns := range connMap {
pod, ok := podMap[addr]
if !ok {
// We have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection
// was transferred by a different goroutine to this new pod
// right after we fetch the list of pods from the directory
// cache above. Ignore here, and this connection will be handled
// on the next rebalance loop.
continue
}
activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}
}

// Transfer all connections in DRAINING pods.
//
// TODO(jaylim-crl): Consider extracting this logic for the DRAINING
// case into a separate function once we add the rebalancing logic.
if pod.State != tenant.DRAINING {
continue
}
// rebalancePartition rebalances the given assignments partition.
func (b *Balancer) rebalancePartition(
pods map[string]*tenant.Pod, assignments []*ServerAssignment,
) {
// Nothing to do here.
if len(pods) == 0 || len(assignments) == 0 {
return
}

// Only move connections for pods which have been draining for
// at least 1 minute. When load is fluctuating, the pod may
// transition back and forth between the DRAINING and RUNNING
// states. This check prevents us from moving connections around
// when that happens.
drainingFor := b.timeSource.Now().Sub(pod.StateTimestamp)
if drainingFor < minDrainPeriod {
continue
}
// Transfer assignments away if the partition is in an imbalanced state.
toMove := collectRunningPodAssignments(pods, assignments, rebalancePercentDeviation)
b.enqueueRebalanceRequests(toMove)

for _, c := range podConns {
b.queue.enqueue(&rebalanceRequest{
createdAt: b.timeSource.Now(),
conn: c,
})
}
// Move all assignments away from DRAINING pods if and only if the pods have
// been draining for at least minDrainPeriod.
toMove = collectDrainingPodAssignments(pods, assignments, b.timeSource)
b.enqueueRebalanceRequests(toMove)
}

// enqueueRebalanceRequests enqueues N random server assignments for a transfer
// operation based on the defined rebalance rate. For example, if there are 10
// server assignments in the input list, and rebalance rate is 0.4, four server
// assignments will be selected at random, and enqueued for a transfer.
//
// NOTE: Elements in the list may be shuffled around once this method returns.
func (b *Balancer) enqueueRebalanceRequests(list []*ServerAssignment) {
toMoveCount := int(math.Ceil(float64(len(list)) * float64(rebalanceRate)))
partition, _ := partitionNRandom(list, toMoveCount)
for _, a := range partition {
b.queue.enqueue(&rebalanceRequest{
createdAt: b.timeSource.Now(),
conn: a.Owner(),
})
}
}

// collectRunningPodAssignments returns a set of ServerAssignments that have to
// be moved because the partition is in an imbalanced state. Only assignments to
// RUNNING pods will be accounted for.
//
// NOTE: pods should not be nil, and percentDeviation must be between [0, 1].
func collectRunningPodAssignments(
pods map[string]*tenant.Pod, partition []*ServerAssignment, percentDeviation float64,
) []*ServerAssignment {
// Construct a map indexed by addresses of pods.
podAssignments := make(map[string][]*ServerAssignment)

// Ensure that all RUNNING pods have an entry in podAssignments. Doing that
// allows us to account for new or underutilized pods.
for _, pod := range pods {
if pod.State == tenant.RUNNING {
podAssignments[pod.Addr] = nil
}
}
numAssignments := 0
for _, a := range partition {
// If the assignment's address was not found in podAssignments, this
// means that we have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection was
// transferred by a different goroutine to this new pod right after we
// fetch the list of pods from the directory cache. Ignore here, and
// this connection will be handled on the next rebalance loop.
if _, ok := podAssignments[a.Addr()]; ok {
numAssignments++
podAssignments[a.Addr()] = append(podAssignments[a.Addr()], a)
}
}

// No pods or assignments to work with.
if len(podAssignments) == 0 || numAssignments == 0 {
return nil
}

// Calculate average number of assignments, and lower/upper bounds based
// on the rebalance percent deviation. We want to ensure that the number
// of assignments on each pod is within [lowerBound, upperBound]. If all
// of the pods are within that interval, the partition is considered to be
// balanced.
//
// Note that bounds cannot be 0, or else the addition of a new pod with no
// connections may still result in a balanced state.
avgAssignments := float64(numAssignments) / float64(len(podAssignments))
lowerBound := int(math.Max(1, math.Floor(avgAssignments*(1-percentDeviation))))
upperBound := int(math.Max(1, math.Ceil(avgAssignments*(1+percentDeviation))))

// Normalize average to fit between [lowerBound, upperBound]. This implies
// that average must be at least 1, and we don't end up moving assignments
// away resulting in 0 assignments to a pod.
avgAssignments = float64(lowerBound+upperBound) / 2.0

// Construct a set of assignments that we want to move, and the algorithm to
// do so would be as follows:
//
// 1. Compute the number of assignments that we need to move. This would
// be X = MAX(n, m), where:
// n = total number of assignments that exceed the upper bound
// m = total number of assignments that fall short of lower bound
//
// 2. First pass on podAssignments: collect assignments that exceed the
// upper bound. Update podAssignments and X to reflect the remaining
// assignments accordingly.
//
// 3. Second pass on podAssignments: greedily collect as many assignments
// up to X without violating the average.
//
// The implementation below is an optimization of the algorithm described
// above, where steps 1 and 2 are combined. We also randomize the collection
// process to ensure that there are no biases.

// Steps 1 and 2.
missingCount := 0
var mustMove, eligibleToMove []*ServerAssignment
var random []*ServerAssignment
for addr, d := range podAssignments {
missingCount += int(math.Max(float64(lowerBound-len(d)), 0.0))

// Move everything that exceed the upper bound.
excess := len(d) - upperBound
if excess > 0 {
random, d = partitionNRandom(d, excess)
mustMove = append(mustMove, random...)
missingCount -= excess
}

// The remaining ones that exceed the average are eligible for a move.
// Within each pod, choose `excess` assignments randomly.
excess = len(d) - int(avgAssignments)
if excess > 0 {
random, d = partitionNRandom(d, excess)
eligibleToMove = append(eligibleToMove, random...)
}

podAssignments[addr] = d
}

// Step 3.
// Across all pods, choose `missingCount` assignments randomly.
if missingCount > 0 {
random, _ = partitionNRandom(eligibleToMove, missingCount)
mustMove = append(mustMove, random...)
}

return mustMove
}

// collectDrainingPodAssignments returns a set of ServerAssignments that have to
// be moved because the pods that they are in have been draining for at least
// minDrainPeriod.
//
// NOTE: pods and timeSource should not be nil.
func collectDrainingPodAssignments(
pods map[string]*tenant.Pod, partition []*ServerAssignment, timeSource timeutil.TimeSource,
) []*ServerAssignment {
var collected []*ServerAssignment
for _, a := range partition {
pod, ok := pods[a.Addr()]
if !ok || pod.State != tenant.DRAINING {
// We have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection was
// transferred by a different goroutine to this new pod right after
// we fetch the list of pods from the directory cache. Ignore here,
// and this connection will be handled on the next rebalance loop.
continue
}

// Only move connections for pods which have been draining for at least
// 1 minute. When load is fluctuating, the pod may transition back and
// forth between the DRAINING and RUNNING states. This check prevents us
// from moving connections around when that happens.
drainingFor := timeSource.Now().Sub(pod.StateTimestamp)
if drainingFor < minDrainPeriod {
continue
}
collected = append(collected, a)
}
return collected
}

// partitionNRandom partitions the input slice into two, with the first being
// n random elements, and the second being the remaining elements.
// - If n <= 0, (nil, nil) will be returned.
// - If n <= len(src), (src, nil) will be returned.
//
// NOTE: Elements in src may be shuffled around when this function returns, so
// this does not allocate extra memory. It is guaranteed that elements that are
// chosen will be the last n items of src.
var partitionNRandom = func(
src []*ServerAssignment, n int,
) (chosen []*ServerAssignment, rest []*ServerAssignment) {
if n <= 0 {
return nil, nil
}
if n >= len(src) {
return src, nil
}
restLen := len(src)
for i := 0; i < n; i++ {
idx := rand.Intn(restLen)
src[idx], src[restLen-1] = src[restLen-1], src[idx]
restLen--
}
return src[restLen:], src[:restLen]
}

// rebalanceRequest corresponds to a rebalance request.
Expand Down Expand Up @@ -397,6 +589,11 @@ func (q *rebalancerQueue) enqueue(req *rebalanceRequest) {
q.mu.Lock()
defer q.mu.Unlock()

// Test environments may create rebalanceRequests with nil owners.
if req.conn == nil {
return
}

e, ok := q.elements[req.conn]
if ok {
// Use the newer request of the two.
Expand Down
Loading

0 comments on commit f4db230

Please sign in to comment.