Skip to content

Commit

Permalink
ccl/sqlproxyccl: add basic Balancer component for load balancing
Browse files Browse the repository at this point in the history
This commit adds a basic Balancer component that is responsible for load
balancing connections across SQL pods. This commit updates the connector
component to invoke methods on the balancer when selecting a pod instead of
the tenant directory. For now, the balancer does nothing specific, and
invokes methods on the directory.

Release note: None
  • Loading branch information
jaylim-crl committed Mar 20, 2022
1 parent 6123205 commit fd3c5be
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 78 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ALL_TESTS = [
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test",
"//pkg/ccl/sqlproxyccl/balancer:balancer_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
"//pkg/ccl/sqlproxyccl/interceptor:interceptor_test",
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
Expand Down Expand Up @@ -70,10 +71,11 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenant/simpledir",
"//pkg/ccl/sqlproxyccl/tenant/testutils",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "balancer",
srcs = ["balancer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/roachpb",
],
)

go_test(
name = "balancer_test",
srcs = ["balancer_test.go"],
deps = [
":balancer",
"//pkg/ccl/sqlproxyccl/tenant/testutils",
"//pkg/roachpb",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
87 changes: 87 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// Balancer handles load balancing of SQL connections within the proxy.
type Balancer struct {
// directory corresponds to the tenant directory that stores information
// about SQL pods for each tenant.
directory tenant.Directory
}

// NewBalancer constructs a new Balancer instance that is responsible for
// load balancing SQL connections within the proxy.
func NewBalancer(directory tenant.Directory) *Balancer {
return &Balancer{directory: directory}
}

// ChoosePodAddr returns the IP address of one of this tenant's available pods.
// This applies a weighted load balancing algorithm when selecting a pod from a
// list. If the tenant is suspended (i.e. list is empty initially), and no pods
// are available, this will trigger resumption of the tenant, and return the IP
// address of the new pod. If the tenant cannot be resumed, this may return a
// GRPC FailedPrecondition error.
//
// If clusterName is non-empty, then a GRPC NotFound error is returned if no
// pods match the cluster's name. This can be used to ensure that the
// incoming SQL connection "knows" some additional information about the
// tenant, such as the name of the cluster, before being allowed to connect.
// Similarly, if the tenant does not exist (e.g. because it was deleted),
// this returns a GRPC NotFound error.
//
// Note that resuming a tenant requires directory server calls, so this can
// block for some time, until the resumption process is complete.
//
// TODO(jaylim-crl): Remap GRPC NotFound and FailedPrecondition errors to a
// concrete error type. These GRPC errors should be internal to the service
// directory.
func (b *Balancer) ChoosePodAddr(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) (string, error) {
// TODO(jaylim-crl): We currently choose the pod's address within
// selectTenantPod, which is called by EnsureTenantAddr. That logic has to
// be extracted out into the balancer. EnsureTenantAddr should only block
// until there is at least 1 pod, so the balancer would first call
// EnsureTenantAddr, followed by LookupTenantAddrs to retrieve a list of
// SQL pods. Finally, the balancer would apply the weighted load balancing
// algorithm on the list of pods. The tenant directory should just be
// responsible for reporting a list of SQL pods, and allowing callers to
// resume SQL pods (e.g. EnsureTenantAddr).
return b.directory.EnsureTenantAddr(ctx, tenantID, clusterName)
}

// ListPodAddrs returns the IP addresses of all available SQL pods for the given
// tenant. This returns a GRPC NotFound error if the tenant does not exist (e.g.
// it has not yet been created) or if it has not yet been fetched into the
// directory's cache (ListPodAddrs will never attempt to fetch it). Unlike
// ChoosePodAddr, if no SQL pods are available for the tenant, ListPodAddrs
// will return the empty set.
//
// TODO(jaylim-crl): Remap GRPC NotFound to a concrete error type. The GRPC
// NotFound error should be internal to the directory. This GRPC error should be
// internal to the service directory.
func (b *Balancer) ListPodAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) {
return b.directory.LookupTenantAddrs(ctx, tenantID)
}

// ReportFailure reports a connection failure to the balancer. This will inform
// the balancer that the process at addr is unhealthy, so that it is less likely
// to be chosen. It will also refresh the directory's internal cache.
func (b *Balancer) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
return b.directory.ReportFailure(ctx, tenantID, addr)
}
80 changes: 80 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant/testutils"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestBalancer currently tests that it calls the directory with the right
// arguments. Once we add more logic to the balancer, we'll update this
// accordingly. We don't handle the error case here since the tests in
// proxy_handler_test.go should handle that.
func TestBalancer(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tenantID := roachpb.MakeTenantID(10)
const clusterName = "foo-bar"

testDir := &testutils.TestDirectory{
EnsureTenantAddrFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
fnClusterName string,
) (string, error) {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
require.Equal(t, clusterName, fnClusterName)
return "127.0.0.10:42", nil
},
LookupTenantAddrsFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
) ([]string, error) {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
return []string{"127.0.0.10:42"}, nil
},
ReportFailureFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
fnAddr string,
) error {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
require.Equal(t, "127.0.0.10:42", fnAddr)
return nil
},
}
b := balancer.NewBalancer(testDir)

addr, err := b.ChoosePodAddr(ctx, tenantID, clusterName)
require.NoError(t, err)
require.Equal(t, "127.0.0.10:42", addr)

var addrs []string
addrs, err = b.ListPodAddrs(ctx, tenantID)
require.NoError(t, err)
require.Equal(t, []string{"127.0.0.10:42"}, addrs)

err = b.ReportFailure(ctx, tenantID, "127.0.0.10:42")
require.NoError(t, err)

ensureCount, lookupCount, reportCount := testDir.Counts()
require.Equal(t, 1, ensureCount)
require.Equal(t, 1, lookupCount)
require.Equal(t, 1, reportCount)
}
46 changes: 22 additions & 24 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -45,11 +45,11 @@ type connector struct {
ClusterName string
TenantID roachpb.TenantID

// Directory corresponds to the tenant directory, which will be used to
// resolve tenants to their corresponding IP addresses.
// Balancer represents the load balancer component that will be used to
// route connections to SQL pods.
//
// NOTE: This field is required.
Directory tenant.Directory
Balancer *balancer.Balancer

// StartupMsg represents the startup message associated with the client.
// This will be used when establishing a pgwire connection with the SQL pod.
Expand Down Expand Up @@ -215,21 +215,19 @@ func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) {
dialSQLServerErrs = 0
}

// Report the failure to the directory so that it can refresh
// Report the failure to the balancer so that it can refresh
// any stale information that may have caused the problem.
if c.Directory != nil {
if err = reportFailureToDirectory(
ctx, c.TenantID, serverAddr, c.Directory,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
log.Ops.Errorf(ctx,
"report failure (%d errors skipped): %v",
reportFailureErrs,
err,
)
reportFailureErrs = 0
}
if err = reportFailureToBalancer(
ctx, c.TenantID, serverAddr, c.Balancer,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
log.Ops.Errorf(ctx,
"report failure (%d errors skipped): %v",
reportFailureErrs,
err,
)
reportFailureErrs = 0
}
}
continue
Expand Down Expand Up @@ -264,7 +262,7 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
addr, err := c.Balancer.ChoosePodAddr(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
Expand Down Expand Up @@ -337,10 +335,10 @@ func isRetriableConnectorError(err error) bool {
return errors.Is(err, errRetryConnectorSentinel)
}

// reportFailureToDirectory is a hookable function that calls the given tenant
// directory's ReportFailure method.
var reportFailureToDirectory = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory tenant.Directory,
// reportFailureToBalancer is a hookable function that calls the given
// balancer's ReportFailure method.
var reportFailureToBalancer = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, balancer *balancer.Balancer,
) error {
return directory.ReportFailure(ctx, tenantID, addr)
return balancer.ReportFailure(ctx, tenantID, addr)
}
Loading

0 comments on commit fd3c5be

Please sign in to comment.