Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78396: ccl/sqlproxyccl: add basic Balancer component for load balancing r=andy-kimball,JeffSwenson a=jaylim-crl

#### ccl/sqlproxyccl: rename LookupTenantAddrs to TryLookupTenantAddrs 

This commit is purely mechanical, and renames LookupTenantAddrs in the
directory cache to TryLookupTenantAddrs. This is done so that we could
rename EnsureTenantAddr to LookupTenantAddr, which is more explicit.

Release note: None
  
#### ccl/sqlproxyccl: rename EnsureTenantAddr to LookupTenantAddr 

This commit renames EnsureTenantAddr within the tenant directory cache to
LookupTenantAddr. At the same time, we moved the logic for ChoosePodAddr out
of tenantEntry into the directory cache. This will prepare us for moving the
balancing logic out of the directory cache into the balancer.

Release note: None
  
#### ccl/sqlproxyccl: TryLookupTenantAddrs now return []*tenant.Pod 

Previously, TryLookupTenantAddrs returned addresses. This commit is purely
mechanical, and updates that to return a slice of *tenant.Pod instead. We do
this so that it will be consistent with LookupTenantPod later on. A side effect
of this is that the method has been renamed to TryLookupTenantPods.

Release note: None
  
#### ccl/sqlproxyccl: add basic Balancer component for load balancing 

Previously, all the load balancing logic is within the directory cache, which
makes it difficult to implement the connection rebalancing feature. This commit
extracts all of those logic out of the directory cache into a new Balancer
component, which will be responsible for load balancing moving forward. For
that to work, the directory cache now will only return pods in two forms:
- LookupTenantPods
- TryLookupTenantPods

The "try" variant will return an error if the cluster hasn't been fetched into
the directory's cache yet, and will never attempt to resume the tenant if there
are no available pods. LookupTenantPods will block until there is at least one
available SQL pod.

Release note: None

Co-authored-by: Jay <[email protected]>
  • Loading branch information
craig[bot] and jaylim-crl committed Mar 25, 2022
2 parents d2a5885 + 7235584 commit cb10bfe
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 326 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ALL_TESTS = [
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test",
"//pkg/ccl/sqlproxyccl/balancer:balancer_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
"//pkg/ccl/sqlproxyccl/interceptor:interceptor_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
Expand Down Expand Up @@ -70,6 +71,7 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "balancer",
srcs = [
"balancer.go",
"pod.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "balancer_test",
srcs = [
"balancer_test.go",
"pod_test.go",
],
embed = [":balancer"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
62 changes: 62 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"math/rand"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ErrNoAvailablePods is an error that indicates that no pods are available
// for selection.
var ErrNoAvailablePods = errors.New("no available pods")

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
// mu synchronizes access to fields in the struct.
mu struct {
syncutil.Mutex

// rng corresponds to the random number generator instance which will
// be used for load balancing.
rng *rand.Rand
}
}

// NewBalancer constructs a new Balancer instance that is responsible for
// load balancing SQL connections within the proxy.
func NewBalancer() *Balancer {
b := &Balancer{}
b.mu.rng, _ = randutil.NewPseudoRand()
return b
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
// CPU load algorithm. It is expected that all pods within the list belongs to
// the same tenant. If no pods are available, this returns ErrNoAvailablePods.
func (b *Balancer) SelectTenantPod(pods []*tenant.Pod) (*tenant.Pod, error) {
pod := selectTenantPod(b.randFloat32(), pods)
if pod == nil {
return nil, ErrNoAvailablePods
}
return pod, nil
}

// randFloat32 generates a random float32 within the bounds [0, 1) and is
// thread-safe.
func (b *Balancer) randFloat32() float32 {
b.mu.Lock()
defer b.mu.Unlock()
return b.mu.rng.Float32()
}
36 changes: 36 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer_test

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestBalancer(t *testing.T) {
defer leaktest.AfterTest(t)()

b := balancer.NewBalancer()

t.Run("no pods", func(t *testing.T) {
pod, err := b.SelectTenantPod([]*tenant.Pod{})
require.EqualError(t, err, balancer.ErrNoAvailablePods.Error())
require.Nil(t, pod)
})

t.Run("few pods", func(t *testing.T) {
pod, err := b.SelectTenantPod([]*tenant.Pod{{Addr: "1"}, {Addr: "2"}})
require.NoError(t, err)
require.Contains(t, []string{"1", "2"}, pod.Addr)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package tenant
package balancer

// selectTenantPod selects a tenant pod from the given list to received
// incoming traffic. Pods are weighted by their reported CPU load. rand must be
// a pseudo random number within the bounds [0, 1). It is suggested to use
// Float32() of a PseudoRand instance that is guarded by a mutex.
import "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"

// selectTenantPod selects a tenant pod from the given list to receive incoming
// traffic. Pods are weighted by their reported CPU load. rand must be a pseudo
// random number within the bounds [0, 1). It is suggested to use Float32() of a
// PseudoRand instance that is guarded by a mutex.
//
// rngMu.Lock()
// rand := rng.Float32()
// rngMu.Unlock()
// selectTenantPod(rand, pods)
func selectTenantPod(rand float32, pods []*Pod) *Pod {
func selectTenantPod(rand float32, pods []*tenant.Pod) *tenant.Pod {
if len(pods) == 0 {
return nil
}

if len(pods) == 1 {
return pods[0]
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/pod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestSelectTenantPods(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("no pods", func(t *testing.T) {
require.Nil(t, selectTenantPod(0, nil))
})

t.Run("one pod", func(t *testing.T) {
pod := selectTenantPod(0, []*tenant.Pod{{Addr: "1"}})
require.Equal(t, "1", pod.Addr)
})

t.Run("many pods", func(t *testing.T) {
pods := []*tenant.Pod{
{Addr: "1", Load: 0.0},
{Addr: "2", Load: 0.5},
{Addr: "3", Load: 0.9},
}

distribution := map[string]int{}
rng := rand.New(rand.NewSource(0))

for i := 0; i < 10000; i++ {
pod := selectTenantPod(rng.Float32(), pods)
distribution[pod.Addr]++
}

// Assert that the distribution is a roughly function of 1 - Load.
require.Equal(t, map[string]int{
"1": 6121,
"2": 3214,
"3": 665,
}, distribution)
})
}
27 changes: 24 additions & 3 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -51,6 +52,12 @@ type connector struct {
// NOTE: This field is required.
DirectoryCache tenant.DirectoryCache

// Balancer represents the load balancer component that will be used to
// choose which SQL pod to route the connection to.
//
// NOTE: This field is required.
Balancer *balancer.Balancer

// StartupMsg represents the startup message associated with the client.
// This will be used when establishing a pgwire connection with the SQL pod.
//
Expand Down Expand Up @@ -263,19 +270,33 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

// Lookup tenant in the directory cache.
addr, err := c.DirectoryCache.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
// Lookup tenant in the directory cache. Once we have retrieve the list of
// pods, use the Balancer for load balancing.
pods, err := c.DirectoryCache.LookupTenantPods(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
// Note that LookupTenantPods will always return RUNNING pods, so this
// is fine for now. If we start changing that to also return DRAINING
// pods, we'd have to filter accordingly.
pod, err := c.Balancer.SelectTenantPod(pods)
if err != nil {
// This should never happen because LookupTenantPods ensured that
// len(pods) should never be 0. Mark it as a retriable connection
// anyway.
return "", markAsRetriableConnectorError(err)
}
return pod.Addr, nil

case status.Code(err) == codes.FailedPrecondition:
if st, ok := status.FromError(err); ok {
return "", newErrorf(codeUnavailable, "%v", st.Message())
}
return "", newErrorf(codeUnavailable, "unavailable")

case status.Code(err) == codes.NotFound:
return "", newErrorf(codeParamsRoutingFailed,
"cluster %s-%d not found", c.ClusterName, c.TenantID.ToUint64())

default:
return "", markAsRetriableConnectorError(err)
}
Expand Down
Loading

0 comments on commit cb10bfe

Please sign in to comment.