Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: ccl/sqlproxyccl: rename tenant.Resolver to tenant.DirectoryCache #78394

Merged
merged 2 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
Expand Down
84 changes: 28 additions & 56 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -34,39 +35,6 @@ const sessionRevivalTokenStartupParam = "crdb:session_revival_token_base64"
// remoteAddrStartupParam contains the remote address of the original client.
const remoteAddrStartupParam = "crdb:remote_addr"

// TenantResolver is an interface for the tenant directory. Currently only
// tenant.Directory implements it.
//
// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory
// to tenant.directory. This needs to be moved into the tenant package as well.
// This is added here to aid testing.
type TenantResolver interface {
// EnsureTenantAddr returns an IP address of one of the given tenant's SQL
// processes based on the tenantID and clusterName fields. This should block
// until the process associated with the IP is ready.
//
// If no matching pods are found (e.g. cluster name mismatch, or tenant was
// deleted), this will return a GRPC NotFound error.
EnsureTenantAddr(
ctx context.Context,
tenantID roachpb.TenantID,
clusterName string,
) (string, error)

// LookupTenantAddrs returns the IP addresses for all available SQL
// processes for the given tenant. It returns a GRPC NotFound error if the
// tenant does not exist.
//
// Unlike EnsureTenantAddr which blocks until there is an associated
// process, LookupTenantAddrs will just return an empty set if no processes
// are available for the tenant.
LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)

// ReportFailure is used to indicate to the resolver that a connection
// attempt to connect to a particular SQL tenant pod have failed.
ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// connector is a per-session tenant-associated component that can be used to
// obtain a connection to the tenant cluster. This will also handle the
// authentication phase. All connections returned by the connector should
Expand All @@ -79,24 +47,24 @@ type connector struct {
ClusterName string
TenantID roachpb.TenantID

// Directory corresponds to the tenant directory, which will be used to
// resolve tenants to their corresponding IP addresses. If this isn't set,
// we will fallback to use RoutingRule.
// DirectoryCache corresponds to the tenant directory cache, which will be
// used to resolve tenants to their corresponding IP addresses. If this
// isn't set, we will fallback to use RoutingRule.
//
// TODO(jaylim-crl): Replace this with a Directory interface, and remove
// the RoutingRule field. RoutingRule should not be in here.
// TODO(jaylim-crl): Remove the RoutingRule field. RoutingRule should not
// be in here.
//
// NOTE: This field is optional.
Directory TenantResolver
DirectoryCache tenant.DirectoryCache

// RoutingRule refers to the static rule that will be used when resolving
// tenants. This will be used directly whenever the Directory field isn't
// specified, or as a fallback if one was specified.
// tenants. This will be used directly whenever the DirectoryCache field
// isn't specified, or as a fallback if one was specified.
//
// The literal "{{clusterName}}" will be replaced with ClusterName within
// the RoutingRule string.
//
// NOTE: This field is optional, if Directory isn't set.
// NOTE: This field is optional, if DirectoryCache isn't set.
RoutingRule string

// StartupMsg represents the startup message associated with the client.
Expand Down Expand Up @@ -263,11 +231,12 @@ func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) {
dialSQLServerErrs = 0
}

// Report the failure to the directory so that it can refresh
// any stale information that may have caused the problem.
if c.Directory != nil {
if err = reportFailureToDirectory(
ctx, c.TenantID, serverAddr, c.Directory,
// Report the failure to the directory cache so that it can
// refresh any stale information that may have caused the
// problem.
if c.DirectoryCache != nil {
if err = reportFailureToDirectoryCache(
ctx, c.TenantID, serverAddr, c.DirectoryCache,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
Expand Down Expand Up @@ -315,9 +284,9 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

// First try to lookup tenant in the directory (if available).
if c.Directory != nil {
addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
// First try to lookup tenant in the directory cache (if available).
if c.DirectoryCache != nil {
addr, err := c.DirectoryCache.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
Expand All @@ -337,7 +306,7 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
// map to a GRPC NotFound error.
//
// TODO(jaylim-crl): This code is temporary. Remove this once we have fully
// replaced this with a Directory interface. This fallback does not need
// replaced this with a Directory GRPC server. This fallback does not need
// to exist.
addr := strings.ReplaceAll(
c.RoutingRule, "{{clusterName}}",
Expand Down Expand Up @@ -407,10 +376,13 @@ func isRetriableConnectorError(err error) bool {
return errors.Is(err, errRetryConnectorSentinel)
}

// reportFailureToDirectory is a hookable function that calls the given tenant
// directory's ReportFailure method.
var reportFailureToDirectory = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
// reportFailureToDirectoryCache is a hookable function that calls the given
// tenant directory cache's ReportFailure method.
var reportFailureToDirectoryCache = func(
ctx context.Context,
tenantID roachpb.TenantID,
addr string,
directoryCache tenant.DirectoryCache,
) error {
return directory.ReportFailure(ctx, tenantID, addr)
return directoryCache.ReportFailure(ctx, tenantID, addr)
}
30 changes: 16 additions & 14 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -434,7 +435,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
c := &connector{
TenantID: roachpb.MakeTenantID(42),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
reportFailureFn: func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -532,7 +533,7 @@ func TestConnector_lookupAddr(t *testing.T) {
ClusterName: "my-foo",
TenantID: roachpb.MakeTenantID(10),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -555,7 +556,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -590,7 +591,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -613,7 +614,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -731,31 +732,32 @@ func TestRetriableConnectorError(t *testing.T) {
require.True(t, errors.Is(err, errRetryConnectorSentinel))
}

var _ TenantResolver = &testTenantResolver{}
var _ tenant.DirectoryCache = &testTenantDirectoryCache{}

// testTenantResolver is a test implementation of the tenant resolver.
type testTenantResolver struct {
// testTenantDirectoryCache is a test implementation of the tenant directory
// cache.
type testTenantDirectoryCache struct {
ensureTenantAddrFn func(ctx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error)
lookupTenantAddrsFn func(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)
reportFailureFn func(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// EnsureTenantAddr implements the TenantResolver interface.
func (r *testTenantResolver) EnsureTenantAddr(
// EnsureTenantAddr implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) EnsureTenantAddr(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) (string, error) {
return r.ensureTenantAddrFn(ctx, tenantID, clusterName)
}

// LookupTenantAddrs implements the TenantResolver interface.
func (r *testTenantResolver) LookupTenantAddrs(
// LookupTenantAddrs implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) LookupTenantAddrs(
ctx context.Context, tenantID roachpb.TenantID,
) ([]string, error) {
return r.lookupTenantAddrsFn(ctx, tenantID)
}

// ReportFailure implements the TenantResolver interface.
func (r *testTenantResolver) ReportFailure(
// ReportFailure implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
return r.reportFailureFn(ctx, tenantID, addr)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type proxyHandler struct {
// idleMonitor will detect idle connections to DRAINING pods.
idleMonitor *idle.Monitor

// directory is optional and if set, will be used to resolve
// backend id to IP addresses.
directory *tenant.Directory
// directoryCache is optional and if set, will be used to resolve tenants
// to their IP addresses.
directoryCache tenant.DirectoryCache

// CertManger keeps up to date the certificates used.
certManager *certmgr.CertManager
Expand Down Expand Up @@ -196,7 +196,7 @@ func newProxyHandler(
}

client := tenant.NewDirectoryClient(conn)
handler.directory, err = tenant.NewDirectory(ctx, stopper, client, dirOpts...)
handler.directoryCache, err = tenant.NewDirectoryCache(ctx, stopper, client, dirOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -280,8 +280,8 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
RoutingRule: handler.RoutingRule,
StartupMsg: backendStartupMsg,
}
if handler.directory != nil {
connector.Directory = handler.directory
if handler.directoryCache != nil {
connector.DirectoryCache = handler.directoryCache
}

// TLS options for the proxy are split into Insecure and SkipVerify.
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -676,17 +677,17 @@ func TestDirectoryConnect(t *testing.T) {

// Ensure that Directory.ReportFailure is being called correctly.
countReports := 0
defer testutils.TestingHook(&reportFailureToDirectory, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
defer testutils.TestingHook(&reportFailureToDirectoryCache, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directoryCache tenant.DirectoryCache,
) error {
require.Equal(t, roachpb.MakeTenantID(28), tenantID)
addrs, err := directory.LookupTenantAddrs(ctx, tenantID)
addrs, err := directoryCache.LookupTenantAddrs(ctx, tenantID)
require.NoError(t, err)
require.Len(t, addrs, 1)
require.Equal(t, addrs[0], addr)

countReports++
err = directory.ReportFailure(ctx, tenantID, addr)
err = directoryCache.ReportFailure(ctx, tenantID, addr)
require.NoError(t, err)
return err
})()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go_proto_library(
go_library(
name = "tenant",
srcs = [
"directory.go",
"directory_cache.go",
"entry.go",
"pod.go",
],
Expand All @@ -47,7 +47,7 @@ go_test(
name = "tenant_test",
size = "large",
srcs = [
"directory_test.go",
"directory_cache_test.go",
"main_test.go",
"pod_test.go",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/tenant/directory.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Cockroach Authors.
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
Expand Down
Loading