Skip to content

Commit

Permalink
ccl/sqlproxyccl: run rebalancer queue processor in the background
Browse files Browse the repository at this point in the history
The previous commit added a rebalancer queue. This commit connects the queue to
the balancer, and runs the queue processor in the background. By the default,
we limit up to 100 concurrent transfers at any point in time, and each transfer
will be retried up to 3 times.

Release note: None
  • Loading branch information
jaylim-crl committed Apr 5, 2022
1 parent 5ff9c33 commit 02b5be6
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ go_library(
"//pkg/roachpb",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_marusama_semaphore//:semaphore",
],
)

Expand All @@ -33,7 +35,10 @@ go_test(
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/roachpb",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
136 changes: 131 additions & 5 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
Expand All @@ -26,6 +28,37 @@ import (
// for selection.
var ErrNoAvailablePods = errors.New("no available pods")

// defaultMaxConcurrentRebalances represents the maximum number of concurrent
// rebalance requests that are being processed. This effectively limits the
// number of concurrent transfers per proxy.
const defaultMaxConcurrentRebalances = 100

// maxTransferAttempts represents the maximum number of transfer attempts per
// rebalance requests when the previous attempts failed (possibly due to an
// unsafe transfer point). Note that each transfer attempt currently has a
// timeout of 15 seconds, so retrying up to 3 times may hold onto processSem
// up to 45 seconds for each rebalance request.
//
// TODO(jaylim-crl): Reduce transfer timeout to 5 seconds.
const maxTransferAttempts = 3

// balancerOptions controls the behavior of the balancer component.
type balancerOptions struct {
maxConcurrentRebalances int
}

// Option defines an option that can be passed to NewBalancer in order to
// control its behavior.
type Option func(opts *balancerOptions)

// MaxConcurrentRebalances defines the maximum number of concurrent rebalance
// operations for the balancer. This defaults to defaultMaxConcurrentRebalances.
func MaxConcurrentRebalances(max int) Option {
return func(opts *balancerOptions) {
opts.maxConcurrentRebalances = max
}
}

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
Expand All @@ -37,16 +70,54 @@ type Balancer struct {
// be used for load balancing.
rng *rand.Rand
}

// stopper is used to start async tasks (e.g. transfer requests) within the
// balancer.
stopper *stop.Stopper

// queue represents the rebalancer queue. All transfer requests should be
// enqueued to this queue instead of calling the transfer API directly.
queue *rebalancerQueue

// processSem is used to limit the number of concurrent rebalance requests
// that are being processed.
processSem semaphore.Semaphore
}

// NewBalancer constructs a new Balancer instance that is responsible for
// load balancing SQL connections within the proxy.
//
// TODO(jaylim-crl): Update Balancer to take in a ConnTracker object.
func NewBalancer() *Balancer {
b := &Balancer{}
func NewBalancer(ctx context.Context, stopper *stop.Stopper, opts ...Option) (*Balancer, error) {
// Handle options.
options := &balancerOptions{}
for _, opt := range opts {
opt(options)
}
if options.maxConcurrentRebalances == 0 {
options.maxConcurrentRebalances = defaultMaxConcurrentRebalances
}

// Ensure that ctx gets cancelled on stopper's quiescing.
ctx, _ = stopper.WithCancelOnQuiesce(ctx)

q, err := newRebalancerQueue(ctx)
if err != nil {
return nil, err
}

b := &Balancer{
stopper: stopper,
queue: q,
processSem: semaphore.New(options.maxConcurrentRebalances),
}
b.mu.rng, _ = randutil.NewPseudoRand()
return b

if err := b.stopper.RunAsyncTask(ctx, "processQueue", b.processQueue); err != nil {
return nil, err
}

return b, nil
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
Expand All @@ -68,6 +139,61 @@ func (b *Balancer) randFloat32() float32 {
return b.mu.rng.Float32()
}

// processQueue runs on a background goroutine, and invokes TransferConnection
// for each rebalance request.
func (b *Balancer) processQueue(ctx context.Context) {
// processOneReq processors a request from the balancer queue. If the queue
// is empty, this blocks. This returns true if processing should continue,
// or false otherwise.
processOneReq := func() (canContinue bool) {
if err := b.processSem.Acquire(ctx, 1); err != nil {
log.Errorf(ctx, "could not acquire processSem: %v", err.Error())
return false
}

req, err := b.queue.dequeue(ctx)
if err != nil {
// Context is cancelled.
log.Errorf(ctx, "could not dequeue from rebalancer queue: %v", err.Error())
return false
}

// TODO(jaylim-crl): implement enhancements:
// 1. Add metrics to track the number of active transfers.
// 2. Rate limit the number of transfers per connection (e.g. once
// every 5 minutes). This ensures that the connection isn't
// ping-ponged between pods within a short interval. However, for
// draining ones, we may want to move right away (or after 60 secs),
// even if the connection was recently transferred to the draining
// pod.
if err := b.stopper.RunAsyncTask(ctx, "processQueue-item", func(ctx context.Context) {
defer b.processSem.Release(1)

// Each request is retried up to maxTransferAttempts.
for i := 0; i < maxTransferAttempts && ctx.Err() == nil; i++ {
// TODO(jaylim-crl): Once the TransferConnection API accepts a
// destination, we could update this code, and pass along dst.
err := req.conn.TransferConnection( /* req.dst */ )
if err == nil || errors.Is(err, context.Canceled) ||
req.dst == req.conn.ServerRemoteAddr() {
break
}

// Retry again if the connection hasn't been closed or
// transferred to the destination.
time.Sleep(250 * time.Millisecond)
}
}); err != nil {
// We should not hit this case, but if we did, log and abandon the
// transfer.
log.Errorf(ctx, "could not run async task for processQueue-item: %v", err.Error())
}
return true
}
for ctx.Err() == nil && processOneReq() {
}
}

// rebalanceRequest corresponds to a rebalance request. For now, this only
// indicates where the connection should be transferred to through dst.
type rebalanceRequest struct {
Expand All @@ -76,8 +202,8 @@ type rebalanceRequest struct {
dst string
}

// rebalancerQueue represents the balancer's internal queue which is used for
// rebalancing requests.
// balancerQueue represents the balancer's internal queue which is used for
// rebalancing requests. All methods on the queue are thread-safe.
type rebalancerQueue struct {
mu syncutil.Mutex
sem semaphore.Semaphore
Expand Down
Loading

0 comments on commit 02b5be6

Please sign in to comment.