Skip to content

Commit

Permalink
ccl/sqlproxyccl: add basic Balancer component for load balancing
Browse files Browse the repository at this point in the history
Previously, all the load balancing logic is within the directory cache, which
makes it difficult to implement the connection rebalancing feature. This commit
extracts all of those logic out of the directory cache into a new Balancer
component, which will be responsible for load balancing moving forward. For
that to work, the directory cache now will only return pods in two forms:
- LookupTenantPods
- TryLookupTenantPods

The "try" variant will return an error if the cluster hasn't been fetched into
the directory's cache yet, and will never attempt to resume the tenant if there
are no available pods. LookupTenantPods will block until there is at least one
available SQL pod.

Release note: None
  • Loading branch information
jaylim-crl committed Mar 24, 2022
1 parent 07b7e27 commit 7235584
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 268 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ALL_TESTS = [
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test",
"//pkg/ccl/sqlproxyccl/balancer:balancer_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
"//pkg/ccl/sqlproxyccl/interceptor:interceptor_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
Expand Down Expand Up @@ -70,6 +71,7 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "balancer",
srcs = [
"balancer.go",
"pod.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "balancer_test",
srcs = [
"balancer_test.go",
"pod_test.go",
],
embed = [":balancer"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
62 changes: 62 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"math/rand"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ErrNoAvailablePods is an error that indicates that no pods are available
// for selection.
var ErrNoAvailablePods = errors.New("no available pods")

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
// mu synchronizes access to fields in the struct.
mu struct {
syncutil.Mutex

// rng corresponds to the random number generator instance which will
// be used for load balancing.
rng *rand.Rand
}
}

// NewBalancer constructs a new Balancer instance that is responsible for
// load balancing SQL connections within the proxy.
func NewBalancer() *Balancer {
b := &Balancer{}
b.mu.rng, _ = randutil.NewPseudoRand()
return b
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
// CPU load algorithm. It is expected that all pods within the list belongs to
// the same tenant. If no pods are available, this returns ErrNoAvailablePods.
func (b *Balancer) SelectTenantPod(pods []*tenant.Pod) (*tenant.Pod, error) {
pod := selectTenantPod(b.randFloat32(), pods)
if pod == nil {
return nil, ErrNoAvailablePods
}
return pod, nil
}

// randFloat32 generates a random float32 within the bounds [0, 1) and is
// thread-safe.
func (b *Balancer) randFloat32() float32 {
b.mu.Lock()
defer b.mu.Unlock()
return b.mu.rng.Float32()
}
36 changes: 36 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer_test

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestBalancer(t *testing.T) {
defer leaktest.AfterTest(t)()

b := balancer.NewBalancer()

t.Run("no pods", func(t *testing.T) {
pod, err := b.SelectTenantPod([]*tenant.Pod{})
require.EqualError(t, err, balancer.ErrNoAvailablePods.Error())
require.Nil(t, pod)
})

t.Run("few pods", func(t *testing.T) {
pod, err := b.SelectTenantPod([]*tenant.Pod{{Addr: "1"}, {Addr: "2"}})
require.NoError(t, err)
require.Contains(t, []string{"1", "2"}, pod.Addr)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package tenant
package balancer

// selectTenantPod selects a tenant pod from the given list to received
// incoming traffic. Pods are weighted by their reported CPU load. rand must be
// a pseudo random number within the bounds [0, 1). It is suggested to use
// Float32() of a PseudoRand instance that is guarded by a mutex.
import "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"

// selectTenantPod selects a tenant pod from the given list to receive incoming
// traffic. Pods are weighted by their reported CPU load. rand must be a pseudo
// random number within the bounds [0, 1). It is suggested to use Float32() of a
// PseudoRand instance that is guarded by a mutex.
//
// rngMu.Lock()
// rand := rng.Float32()
// rngMu.Unlock()
// selectTenantPod(rand, pods)
func selectTenantPod(rand float32, pods []*Pod) *Pod {
func selectTenantPod(rand float32, pods []*tenant.Pod) *tenant.Pod {
if len(pods) == 0 {
return nil
}

if len(pods) == 1 {
return pods[0]
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/pod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestSelectTenantPods(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("no pods", func(t *testing.T) {
require.Nil(t, selectTenantPod(0, nil))
})

t.Run("one pod", func(t *testing.T) {
pod := selectTenantPod(0, []*tenant.Pod{{Addr: "1"}})
require.Equal(t, "1", pod.Addr)
})

t.Run("many pods", func(t *testing.T) {
pods := []*tenant.Pod{
{Addr: "1", Load: 0.0},
{Addr: "2", Load: 0.5},
{Addr: "3", Load: 0.9},
}

distribution := map[string]int{}
rng := rand.New(rand.NewSource(0))

for i := 0; i < 10000; i++ {
pod := selectTenantPod(rng.Float32(), pods)
distribution[pod.Addr]++
}

// Assert that the distribution is a roughly function of 1 - Load.
require.Equal(t, map[string]int{
"1": 6121,
"2": 3214,
"3": 665,
}, distribution)
})
}
27 changes: 24 additions & 3 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -51,6 +52,12 @@ type connector struct {
// NOTE: This field is required.
DirectoryCache tenant.DirectoryCache

// Balancer represents the load balancer component that will be used to
// choose which SQL pod to route the connection to.
//
// NOTE: This field is required.
Balancer *balancer.Balancer

// StartupMsg represents the startup message associated with the client.
// This will be used when establishing a pgwire connection with the SQL pod.
//
Expand Down Expand Up @@ -263,19 +270,33 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

// Lookup tenant in the directory cache.
addr, err := c.DirectoryCache.LookupTenantAddr(ctx, c.TenantID, c.ClusterName)
// Lookup tenant in the directory cache. Once we have retrieve the list of
// pods, use the Balancer for load balancing.
pods, err := c.DirectoryCache.LookupTenantPods(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
// Note that LookupTenantPods will always return RUNNING pods, so this
// is fine for now. If we start changing that to also return DRAINING
// pods, we'd have to filter accordingly.
pod, err := c.Balancer.SelectTenantPod(pods)
if err != nil {
// This should never happen because LookupTenantPods ensured that
// len(pods) should never be 0. Mark it as a retriable connection
// anyway.
return "", markAsRetriableConnectorError(err)
}
return pod.Addr, nil

case status.Code(err) == codes.FailedPrecondition:
if st, ok := status.FromError(err); ok {
return "", newErrorf(codeUnavailable, "%v", st.Message())
}
return "", newErrorf(codeUnavailable, "unavailable")

case status.Code(err) == codes.NotFound:
return "", newErrorf(codeParamsRoutingFailed,
"cluster %s-%d not found", c.ClusterName, c.TenantID.ToUint64())

default:
return "", markAsRetriableConnectorError(err)
}
Expand Down
Loading

0 comments on commit 7235584

Please sign in to comment.