Skip to content

Commit

Permalink
ccl/sqlproxyccl: periodically rebalance active and idle partitions
Browse files Browse the repository at this point in the history
Previously, we were only transferring connections away from DRAINING pods.
This commit adds support for transferring connections until the partitions are
balanced. A partition is considered balanced if the number of assignments to
all the pods are at most 15% away from the average number of assignments.
Note that active and idle partitions will be rebalanced independently.

Release note: None
  • Loading branch information
jaylim-crl committed Apr 29, 2022
1 parent 41e2600 commit b3233e4
Show file tree
Hide file tree
Showing 5 changed files with 773 additions and 114 deletions.
239 changes: 203 additions & 36 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ const (
// DRAINING state before the proxy starts moving connections away from it.
minDrainPeriod = 1 * time.Minute

// rebalancePercentDeviation defines the percentage threshold that the
// current number of assignments can deviate away from the mean. Having a
// 15% "deadzone" reduces frequent transfers especially when load is
// fluctuating.
//
// For example, if the percent deviation is 0.15, and mean is 10, the
// number of assignments for every pod has to be between [8, 12] to be
// considered balanced.
//
// NOTE: This must be between 0 and 1 inclusive.
rebalancePercentDeviation = 0.15

// rebalanceRate defines the rate of rebalancing assignments across SQL
// pods. This rate applies to both RUNNING and DRAINING pods. For example,
// consider the case where the rate is 0.50; if we have decided that we need
// to move 15 assignments away from a particular pod, only 7 pods will be
// moved at a time.
//
// NOTE: This must be between 0 and 1 inclusive. 0 means no rebalancing
// will occur.
rebalanceRate = 0.50

// defaultMaxConcurrentRebalances represents the maximum number of
// concurrent rebalance requests that are being processed. This effectively
// limits the number of concurrent transfers per proxy.
Expand Down Expand Up @@ -296,9 +318,9 @@ func (b *Balancer) rebalance(ctx context.Context) {
continue
}

// Build a podMap so we could easily retrieve the pod by address.
// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
hasRunningPod := false
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

Expand All @@ -316,45 +338,185 @@ func (b *Balancer) rebalance(ctx context.Context) {
continue
}

connMap := b.connTracker.GetConnsMap(tenantID)
for addr, podConns := range connMap {
pod, ok := podMap[addr]
if !ok {
// We have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection
// was transferred by a different goroutine to this new pod
// right after we fetch the list of pods from the directory
// cache above. Ignore here, and this connection will be handled
// on the next rebalance loop.
continue
}
activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}
}

// Transfer all connections in DRAINING pods.
//
// TODO(jaylim-crl): Consider extracting this logic for the DRAINING
// case into a separate function once we add the rebalancing logic.
if pod.State != tenant.DRAINING {
continue
}
// rebalancePartition rebalances the given assignments partition.
func (b *Balancer) rebalancePartition(
pods map[string]*tenant.Pod, assignments []*ServerAssignment,
) {
// Nothing to do here.
if len(pods) == 0 || len(assignments) == 0 {
return
}

// Only move connections for pods which have been draining for
// at least 1 minute. When load is fluctuating, the pod may
// transition back and forth between the DRAINING and RUNNING
// states. This check prevents us from moving connections around
// when that happens.
drainingFor := b.timeSource.Now().Sub(pod.StateTimestamp)
if drainingFor < minDrainPeriod {
continue
}
// Transfer assignments away if the partition is in an imbalanced state.
toMove := collectRunningPodAssignments(pods, assignments, rebalancePercentDeviation)
b.enqueueRebalanceRequests(toMove)

for _, c := range podConns {
b.queue.enqueue(&rebalanceRequest{
createdAt: b.timeSource.Now(),
conn: c,
})
}
// Move all assignments away from DRAINING pods if and only if the pods have
// been draining for at least minDrainPeriod.
toMove = collectDrainingPodAssignments(pods, assignments, b.timeSource)
b.enqueueRebalanceRequests(toMove)
}

// enqueueRebalanceRequests enqueues the first N server assignments for a
// transfer operation based on the defined rebalance rate. For example, if
// there are 10 server assignments in the input list, and rebalance rate is 0.4,
// only the first four server assignments will be enqueued for a transfer.
func (b *Balancer) enqueueRebalanceRequests(list []*ServerAssignment) {
toMoveCount := int(math.Ceil(float64(len(list)) * float64(rebalanceRate)))
for i := 0; i < toMoveCount; i++ {
b.queue.enqueue(&rebalanceRequest{
createdAt: b.timeSource.Now(),
conn: list[i].Owner(),
})
}
}

// collectRunningPodAssignments returns a set of ServerAssignments that have to
// be moved because the partition is in an imbalanced state. Only assignments to
// RUNNING pods will be accounted for.
//
// NOTE: pods should not be nil, and percentDeviation must be between [0, 1].
func collectRunningPodAssignments(
pods map[string]*tenant.Pod, partition []*ServerAssignment, percentDeviation float64,
) []*ServerAssignment {
// Construct a distribution map of server assignments.
numAssignments := 0
distribution := make(map[string][]*ServerAssignment)
for _, a := range partition {
pod, ok := pods[a.Addr()]
if !ok || pod.State != tenant.RUNNING {
// We have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection was
// transferred by a different goroutine to this new pod right after
// we fetch the list of pods from the directory cache. Ignore here,
// and this connection will be handled on the next rebalance loop.
continue
}
distribution[a.Addr()] = append(distribution[a.Addr()], a)
numAssignments++
}

// Ensure that all RUNNING pods have an entry in distribution. Doing that
// allows us to account for new or underutilized pods.
for _, pod := range pods {
if pod.State != tenant.RUNNING {
continue
}
if _, ok := distribution[pod.Addr]; !ok {
distribution[pod.Addr] = []*ServerAssignment{}
}
}

// No pods or assignments to work with.
if len(distribution) == 0 || numAssignments == 0 {
return nil
}

// Calculate average number of assignments, and lower/upper bounds based
// on the rebalance percent deviation. We want to ensure that the number
// of assignments on each pod is within [lowerBound, upperBound]. If all
// of the pods are within that interval, the partition is considered to be
// balanced.
//
// Note that lowerBound cannot be 0, or else the addition of a new pod with
// no connections may still result in a balanced state.
avgAssignments := float64(numAssignments) / float64(len(distribution))
lowerBound := int(math.Max(1, math.Floor(avgAssignments*(1-percentDeviation))))
upperBound := int(math.Ceil(avgAssignments * (1 + percentDeviation)))

// Construct a set of assignments that we want to move, and the algorithm to
// do so would be as follows:
// 1. Compute the number of assignments that we need to move. This would
// be X = MAX(n, m), where:
// n = total number of assignments that exceed the upper bound
// m = total number of assignments that fall short of lower bound
//
// 2. First pass on distribution: collect assignments that exceed the
// upper bound. Update distribution and X to reflect the remaining
// assignments accordingly.
//
// 3. Second pass on distribution: greedily collect as many assignments
// up to X without violating the average. We could theoretically
// minimize the deviation from the mean by collecting from pods
// starting with the ones with the largest number of assignments,
// but this would require a sort.
//
// The implementation below is an optimization of the algorithm described
// above, where steps 1 and 2 are combined. We will also start simple by
// omitting the sort in (3).

// Steps 1 and 2.
missingCount := 0
var toMove []*ServerAssignment
for addr, d := range distribution {
missingCount += int(math.Max(float64(lowerBound-len(d)), 0.0))

// Move everything that exceed the upper bound.
excess := len(d) - upperBound
if excess > 0 {
toMove = append(toMove, d[:excess]...)
distribution[addr] = d[excess:]
missingCount -= excess
}
}

// Step 3.
for addr, d := range distribution {
if missingCount <= 0 {
break
}
extra := len(d) - int(avgAssignments)
if extra <= 0 || len(d) <= 1 {
// Check length in second condition here to ensure that we don't
// remove connections resulting in 0 assignments to that pod.
continue
}
excess := int(math.Min(float64(extra), float64(missingCount)))
missingCount -= excess
toMove = append(toMove, d[:excess]...)
distribution[addr] = d[excess:]
}

return toMove
}

// collectDrainingPodAssignments returns a set of ServerAssignments that have to
// be moved because the pods that they are in have been draining for at least
// minDrainPeriod.
//
// NOTE: pods and timeSource should not be nil.
func collectDrainingPodAssignments(
pods map[string]*tenant.Pod, partition []*ServerAssignment, timeSource timeutil.TimeSource,
) []*ServerAssignment {
var collected []*ServerAssignment
for _, a := range partition {
pod, ok := pods[a.Addr()]
if !ok || pod.State != tenant.DRAINING {
// We have a connection to the pod, but the pod is not in the
// directory cache. This race case happens if the connection was
// transferred by a different goroutine to this new pod right after
// we fetch the list of pods from the directory cache. Ignore here,
// and this connection will be handled on the next rebalance loop.
continue
}

// Only move connections for pods which have been draining for at least
// 1 minute. When load is fluctuating, the pod may transition back and
// forth between the DRAINING and RUNNING states. This check prevents us
// from moving connections around when that happens.
drainingFor := timeSource.Now().Sub(pod.StateTimestamp)
if drainingFor < minDrainPeriod {
continue
}
collected = append(collected, a)
}
return collected
}

// rebalanceRequest corresponds to a rebalance request.
Expand Down Expand Up @@ -397,6 +559,11 @@ func (q *rebalancerQueue) enqueue(req *rebalanceRequest) {
q.mu.Lock()
defer q.mu.Unlock()

// Test environments may create rebalanceRequests with nil owners.
if req.conn == nil {
return
}

e, ok := q.elements[req.conn]
if ok {
// Use the newer request of the two.
Expand Down
Loading

0 comments on commit b3233e4

Please sign in to comment.