Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/sqlproxyccl: add basic Balancer component for load balancing #78156

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ ALL_TESTS = [
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test",
"//pkg/ccl/sqlproxyccl/balancer:balancer_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
"//pkg/ccl/sqlproxyccl/interceptor:interceptor_test",
"//pkg/ccl/sqlproxyccl/tenant:tenant_test",
"//pkg/ccl/sqlproxyccl/tenant/servicedir:servicedir_test",
"//pkg/ccl/sqlproxyccl/tenant/simpledir:simpledir_test",
"//pkg/ccl/sqlproxyccl/throttler:throttler_test",
"//pkg/ccl/sqlproxyccl:sqlproxyccl_test",
"//pkg/ccl/storageccl/engineccl:engineccl_test",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/idle",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenant/servicedir",
"//pkg/ccl/sqlproxyccl/tenant/simpledir",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/roachpb",
"//pkg/security/certmgr",
Expand Down Expand Up @@ -68,8 +71,11 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/balancer",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant/simpledir",
"//pkg/ccl/sqlproxyccl/tenant/testutils",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "balancer",
srcs = ["balancer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/roachpb",
],
)

go_test(
name = "balancer_test",
srcs = ["balancer_test.go"],
deps = [
":balancer",
"//pkg/ccl/sqlproxyccl/tenant/testutils",
"//pkg/roachpb",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
87 changes: 87 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// Balancer handles load balancing of SQL connections within the proxy.
type Balancer struct {
// directory corresponds to the tenant directory that stores information
// about SQL pods for each tenant.
directory tenant.Directory
}

// NewBalancer constructs a new Balancer instance that is responsible for
// load balancing SQL connections within the proxy.
func NewBalancer(directory tenant.Directory) *Balancer {
return &Balancer{directory: directory}
}

// ChoosePodAddr returns the IP address of one of this tenant's available pods.
// This applies a weighted load balancing algorithm when selecting a pod from a
// list. If the tenant is suspended (i.e. list is empty initially), and no pods
// are available, this will trigger resumption of the tenant, and return the IP
// address of the new pod. If the tenant cannot be resumed, this may return a
// GRPC FailedPrecondition error.
//
// If clusterName is non-empty, then a GRPC NotFound error is returned if no
// pods match the cluster's name. This can be used to ensure that the
// incoming SQL connection "knows" some additional information about the
// tenant, such as the name of the cluster, before being allowed to connect.
// Similarly, if the tenant does not exist (e.g. because it was deleted),
// this returns a GRPC NotFound error.
//
// Note that resuming a tenant requires directory server calls, so this can
// block for some time, until the resumption process is complete.
//
// TODO(jaylim-crl): Remap GRPC NotFound and FailedPrecondition errors to a
// concrete error type. These GRPC errors should be internal to the service
// directory.
func (b *Balancer) ChoosePodAddr(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) (string, error) {
// TODO(jaylim-crl): We currently choose the pod's address within
// selectTenantPod, which is called by EnsureTenantAddr. That logic has to
// be extracted out into the balancer. EnsureTenantAddr should only block
// until there is at least 1 pod, so the balancer would first call
// EnsureTenantAddr, followed by LookupTenantAddrs to retrieve a list of
// SQL pods. Finally, the balancer would apply the weighted load balancing
// algorithm on the list of pods. The tenant directory should just be
// responsible for reporting a list of SQL pods, and allowing callers to
// resume SQL pods (e.g. EnsureTenantAddr).
return b.directory.EnsureTenantAddr(ctx, tenantID, clusterName)
}

// ListPodAddrs returns the IP addresses of all available SQL pods for the given
// tenant. This returns a GRPC NotFound error if the tenant does not exist (e.g.
// it has not yet been created) or if it has not yet been fetched into the
// directory's cache (ListPodAddrs will never attempt to fetch it). Unlike
// ChoosePodAddr, if no SQL pods are available for the tenant, ListPodAddrs
// will return the empty set.
//
// TODO(jaylim-crl): Remap GRPC NotFound to a concrete error type. The GRPC
// NotFound error should be internal to the directory. This GRPC error should be
// internal to the service directory.
func (b *Balancer) ListPodAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) {
return b.directory.LookupTenantAddrs(ctx, tenantID)
}

// ReportFailure reports a connection failure to the balancer. This will inform
// the balancer that the process at addr is unhealthy, so that it is less likely
// to be chosen. It will also refresh the directory's internal cache.
func (b *Balancer) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
return b.directory.ReportFailure(ctx, tenantID, addr)
}
80 changes: 80 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant/testutils"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestBalancer currently tests that it calls the directory with the right
// arguments. Once we add more logic to the balancer, we'll update this
// accordingly. We don't handle the error case here since the tests in
// proxy_handler_test.go should handle that.
func TestBalancer(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tenantID := roachpb.MakeTenantID(10)
const clusterName = "foo-bar"

testDir := &testutils.TestDirectory{
EnsureTenantAddrFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
fnClusterName string,
) (string, error) {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
require.Equal(t, clusterName, fnClusterName)
return "127.0.0.10:42", nil
},
LookupTenantAddrsFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
) ([]string, error) {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
return []string{"127.0.0.10:42"}, nil
},
ReportFailureFn: func(
fnCtx context.Context,
fnTenantID roachpb.TenantID,
fnAddr string,
) error {
require.Equal(t, ctx, fnCtx)
require.Equal(t, tenantID, fnTenantID)
require.Equal(t, "127.0.0.10:42", fnAddr)
return nil
},
}
b := balancer.NewBalancer(testDir)

addr, err := b.ChoosePodAddr(ctx, tenantID, clusterName)
require.NoError(t, err)
require.Equal(t, "127.0.0.10:42", addr)

var addrs []string
addrs, err = b.ListPodAddrs(ctx, tenantID)
require.NoError(t, err)
require.Equal(t, []string{"127.0.0.10:42"}, addrs)

err = b.ReportFailure(ctx, tenantID, "127.0.0.10:42")
require.NoError(t, err)

ensureCount, lookupCount, reportCount := testDir.Counts()
require.Equal(t, 1, ensureCount)
require.Equal(t, 1, lookupCount)
require.Equal(t, 1, reportCount)
}
Loading