Skip to content

Commit

Permalink
sqlproxyccl: rework sqlproxy connection throttler
Browse files Browse the repository at this point in the history
This change switches the sqlproxy connection throttling logic back to
exponential backoff. The tokenbucket approach was introduced by
PR #69041. There are a few behavior differences between this and the
original exponential backoff implementation.

1. The throttling logic is maintained per (ip, tenant) instead of per
   (ip). Some platform as a service provides share a single outbound ip
   address between multiple clients. These users would occasionaly see
   throttling caused by a second user sharing their IP.
2. The throttling logic was triggered before there was an authentication
   failure. It takes ~100ms-1000ms to authenticate with the tenant
   process.  Any requests that arrived after the first request, but
   before it was processed, would trigger the throttle. Now, we only
   trigger the throttle in response to an explict authorization error.

Release note: None
  • Loading branch information
jeffswenson committed Oct 11, 2021
1 parent 41ee2e9 commit 55ba27e
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 428 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
"//pkg/roachpb:with-mocks",
"//pkg/security",
Expand Down
19 changes: 18 additions & 1 deletion pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ package sqlproxyccl
import (
"net"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/jackc/pgproto3/v2"
)

// authenticate handles the startup of the pgwire protocol to the point where
// the connections is considered authenticated. If that doesn't happen, it
// returns an error.
var authenticate = func(clientConn, crdbConn net.Conn) error {
var authenticate = func(clientConn, crdbConn net.Conn, throttleHook func(throttler.AttemptStatus) *pgproto3.ErrorResponse) error {
fe := pgproto3.NewBackend(pgproto3.NewChunkReader(clientConn), clientConn)
be := pgproto3.NewFrontend(pgproto3.NewChunkReader(crdbConn), crdbConn)

Expand All @@ -32,6 +33,22 @@ var authenticate = func(clientConn, crdbConn net.Conn) error {
return newErrorf(codeBackendReadFailed, "unable to receive message from backend: %v", err)
}

var throttleError *pgproto3.ErrorResponse
switch backendMsg.(type) {
case *pgproto3.AuthenticationOk:
throttleError = throttleHook(throttler.AttemptOK)
case *pgproto3.ErrorResponse:
throttleError = throttleHook(throttler.AttemptInvalidCredentials)
}
if throttleError != nil {
// If the connection is throttled, do not forward the response from the server to the
// client. Send a generic throttle error to the client instead.
if err = fe.Send(throttleError); err != nil {
return newErrorf(codeClientWriteFailed, "unable to send throttle error to client: %v", err)
}
return newErrorf(codeProxyRefusedConnection, "connection attempt throttled")
}

err = fe.Send(backendMsg)
if err != nil {
return newErrorf(
Expand Down
82 changes: 78 additions & 4 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ import (
"net"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/require"
)

var nilThrottleHook = func(state throttler.AttemptStatus) *pgproto3.ErrorResponse {
return nil
}

func TestAuthenticateOK(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -33,7 +38,7 @@ func TestAuthenticateOK(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
require.NoError(t, authenticate(srv, cli, nilThrottleHook))
}

func TestAuthenticateClearText(t *testing.T) {
Expand Down Expand Up @@ -75,7 +80,76 @@ func TestAuthenticateClearText(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ReadyForQuery{})
}()

require.NoError(t, authenticate(srv, cli))
require.NoError(t, authenticate(srv, cli, nilThrottleHook))
}

func TestAuthenticateThrottled(t *testing.T) {
defer leaktest.AfterTest(t)()

server := func(t *testing.T, be *pgproto3.Backend, authResponse pgproto3.BackendMessage) {
require.NoError(t, be.Send(&pgproto3.AuthenticationCleartextPassword{}))

msg, err := be.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.PasswordMessage{Password: "password"})

require.NoError(t, be.Send(authResponse))
}

client := func(t *testing.T, fe *pgproto3.Frontend) {
msg, err := fe.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.AuthenticationCleartextPassword{})

require.NoError(t, fe.Send(&pgproto3.PasswordMessage{Password: "password"}))

msg, err = fe.Receive()
require.NoError(t, err)
require.Equal(t, msg, &pgproto3.ErrorResponse{Message: "throttled"})

// Try reading from the connection. This check ensures authorize
// swallowed the OK/Error response from the sql server.
_, err = fe.Receive()
require.Error(t, err)
}

type testCase struct {
name string
result pgproto3.BackendMessage
expectedStatus throttler.AttemptStatus
}
for _, tc := range []testCase{
{
name: "AuthenticationOkay",
result: &pgproto3.AuthenticationOk{},
expectedStatus: throttler.AttemptOK,
},
{
name: "AuthenticationError",
result: &pgproto3.ErrorResponse{Message: "wrong password"},
expectedStatus: throttler.AttemptInvalidCredentials,
},
} {
t.Run(tc.name, func(t *testing.T) {
proxyToServer, serverToProxy := net.Pipe()
proxyToClient, clientToProxy := net.Pipe()
sqlServer := pgproto3.NewBackend(pgproto3.NewChunkReader(serverToProxy), serverToProxy)
sqlClient := pgproto3.NewFrontend(pgproto3.NewChunkReader(clientToProxy), clientToProxy)

go server(t, sqlServer, &pgproto3.AuthenticationOk{})
go client(t, sqlClient)

err := authenticate(proxyToClient, proxyToServer, func(status throttler.AttemptStatus) *pgproto3.ErrorResponse {
require.Equal(t, throttler.AttemptOK, status)
return &pgproto3.ErrorResponse{Message: "throttled"}
})
require.Error(t, err)
require.Contains(t, err.Error(), "connection attempt throttled")

proxyToServer.Close()
proxyToClient.Close()
})
}
}

func TestAuthenticateError(t *testing.T) {
Expand All @@ -93,7 +167,7 @@ func TestAuthenticateError(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.ErrorResponse{Severity: "FATAL", Code: "foo"})
}()

err := authenticate(srv, cli)
err := authenticate(srv, cli, nilThrottleHook)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand All @@ -115,7 +189,7 @@ func TestAuthenticateUnexpectedMessage(t *testing.T) {
require.Equal(t, beMsg, &pgproto3.BindComplete{})
}()

err := authenticate(srv, cli)
err := authenticate(srv, cli, nilThrottleHook)
require.Error(t, err)
codeErr := (*codeError)(nil)
require.True(t, errors.As(err, &codeErr))
Expand Down
35 changes: 19 additions & 16 deletions pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ func updateMetricsAndSendErrToClient(err error, conn net.Conn, metrics *metrics)
SendErrToClient(conn, err)
}

// SendErrToClient will encode and pass back to the SQL client an error message.
// It can be called by the implementors of proxyHandler to give more
// information to the end user in case of a problem.
var SendErrToClient = func(conn net.Conn, err error) {
if err == nil || conn == nil {
return
}
func toPgError(err error) *pgproto3.ErrorResponse {
codeErr := (*codeError)(nil)
if errors.As(err, &codeErr) {
var msg string
Expand All @@ -60,19 +54,28 @@ var SendErrToClient = func(conn net.Conn, err error) {
} else {
pgCode = "08004" // rejected connection
}
_, _ = conn.Write((&pgproto3.ErrorResponse{
return &pgproto3.ErrorResponse{
Severity: "FATAL",
Code: pgCode,
Message: msg,
}).Encode(nil))
} else {
// Return a generic "internal server error" message.
_, _ = conn.Write((&pgproto3.ErrorResponse{
Severity: "FATAL",
Code: "08004", // rejected connection
Message: "internal server error",
}).Encode(nil))
}
}
// Return a generic "internal server error" message.
return &pgproto3.ErrorResponse{
Severity: "FATAL",
Code: "08004", // rejected connection
Message: "internal server error",
}
}

// SendErrToClient will encode and pass back to the SQL client an error message.
// It can be called by the implementors of proxyHandler to give more
// information to the end user in case of a problem.
var SendErrToClient = func(conn net.Conn, err error) {
if err == nil || conn == nil {
return
}
_, _ = conn.Write(toPgError(err).Encode(nil))
}

// ConnectionCopy does a bi-directional copy between the backend and frontend
Expand Down
26 changes: 17 additions & 9 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ type ProxyOptions struct {
// DrainTimeout if set, will close DRAINING connections that have been idle
// for this duration.
DrainTimeout time.Duration

// Token bucket policy used to throttle (IP, TenantID) connection pairs that
// have no history of successful authentication.
ThrottlePolicy throttler.BucketPolicy
// ThrottleBaseDelay is the initial exponential backoff triggered in
// response to the first connection failure.
ThrottleBaseDelay time.Duration
}

// proxyHandler is the default implementation of a proxy handler.
Expand Down Expand Up @@ -135,6 +134,8 @@ type proxyHandler struct {
certManager *certmgr.CertManager
}

var throttledError = newErrorf(codeProxyRefusedConnection, "connection attempt throttled")

// newProxyHandler will create a new proxy handler with configuration based on
// the provided options.
func newProxyHandler(
Expand Down Expand Up @@ -163,7 +164,7 @@ func newProxyHandler(
}

handler.throttleService = throttler.NewLocalService(
throttler.WithPolicy(handler.ThrottlePolicy),
throttler.WithBaseDelay(handler.ThrottleBaseDelay),
)

if handler.DirectoryAddr != "" {
Expand Down Expand Up @@ -260,9 +261,10 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
defer removeListener()

throttleTags := throttler.ConnectionTags{IP: ipAddr, TenantID: tenID.String()}
if err := handler.throttleService.LoginCheck(throttleTags); err != nil {
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
if err != nil {
log.Errorf(ctx, "throttler refused connection: %v", err.Error())
err = newErrorf(codeProxyRefusedConnection, "connection attempt throttled")
err = throttledError
updateMetricsAndSendErrToClient(err, conn, handler.metrics)
return err
}
Expand Down Expand Up @@ -397,14 +399,20 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
defer func() { _ = crdbConn.Close() }()

// Perform user authentication.
if err := authenticate(conn, crdbConn); err != nil {
if err := authenticate(conn, crdbConn, func(status throttler.AttemptStatus) *pgproto3.ErrorResponse {
err := handler.throttleService.ReportAttempt(throttleTags, throttleTime, status)
if err != nil {
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return toPgError(throttledError)
}
return nil
}); err != nil {
handler.metrics.updateForError(err)
log.Ops.Errorf(ctx, "authenticate: %s", err)
return err
}

handler.metrics.SuccessfulConnCount.Inc(1)
handler.throttleService.ReportSuccess(throttleTags)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -979,8 +980,8 @@ func newTester() *tester {
// Record successful connection and authentication.
originalAuthenticate := authenticate
te.restoreAuthenticate =
testutils.TestingHook(&authenticate, func(clientConn, crdbConn net.Conn) error {
err := originalAuthenticate(clientConn, crdbConn)
testutils.TestingHook(&authenticate, func(clientConn, crdbConn net.Conn, throttleHook func(status throttler.AttemptStatus) *pgproto3.ErrorResponse) error {
err := originalAuthenticate(clientConn, crdbConn, throttleHook)
te.setAuthenticated(err == nil)
return err
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/throttler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go_library(
srcs = [
"local.go",
"service.go",
"token_bucket.go",
"throttle.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler",
visibility = ["//visibility:public"],
Expand All @@ -20,7 +20,7 @@ go_test(
name = "throttler_test",
srcs = [
"local_test.go",
"token_bucket_test.go",
"throttle_test.go",
],
embed = [":throttler"],
deps = [
Expand Down
Loading

0 comments on commit 55ba27e

Please sign in to comment.