Skip to content

Commit

Permalink
ccl/sqlproxyccl: throttle logging of all high-frequency errors
Browse files Browse the repository at this point in the history
In #134613, we introduced rate-limiting for throttler errors, but deferred
handling of errors caused by connections blocked by misconfigured access
control lists (ACLs). These errors, often due to incorrect CIDR ranges or
private endpoints, can lead to excessive retries and logging noise.

This commit addresses that issue by introducing a new log-limiting mechanism
for high-frequency errors based on an (IP, tenant) pair. The following cases
are now covered:

1. Refused connections (ACL misconfigurations) - excessive retries from
   disallowed IPs or private endpoint IDs.
2. Auth throttling (invalid logins) - throttling errors due to invalid login
   attempts.
3. Deleted/invalid cluster - errors when a deleted tenant still receives
   request.

This change is internal, so no release note is required.

Epic: none

Release note: None
  • Loading branch information
jaylim-crl committed Nov 12, 2024
1 parent 67bc815 commit bc1a949
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 86 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/security/certmgr",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/util/cache",
"//pkg/util/grpcutil",
"//pkg/util/httputil",
"//pkg/util/log",
Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/sqlproxyccl/acl/cidr_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ func (p *CIDRRanges) CheckConnection(ctx context.Context, conn ConnectionTags) e
}
}

// By default, connections are rejected if no ranges match the connection's
// IP.
return errors.Newf(
"connection to '%s' denied: cluster does not allow public connections from IP %s",
conn.TenantID.String(),
conn.IP,
)
// By default, connections are rejected if no ranges match the connection's IP.
return errors.Newf("cluster does not allow public connections from IP %s", conn.IP)
}
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/acl/cidr_ranges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestCIDRRanges(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn(""))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
})

t.Run("default behavior if no entries", func(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestCIDRRanges(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn(""))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
})

t.Run("good public connection", func(t *testing.T) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ func (p *PrivateEndpoints) CheckConnection(ctx context.Context, conn ConnectionT

// By default, connections are rejected if no endpoints match the
// connection's endpoint ID.
return errors.Newf(
"connection to '%s' denied: cluster does not allow private connections from endpoint '%s'",
conn.TenantID.String(),
conn.EndpointID,
)
return errors.Newf("cluster does not allow private connections from endpoint '%s'", conn.EndpointID)
}

// FindPrivateEndpointID looks for the endpoint identifier within the connection
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestPrivateEndpoints(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn("bar"))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow private connections from endpoint 'bar'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'bar'")
})

t.Run("default behavior if no entries", func(t *testing.T) {
Expand All @@ -78,7 +78,7 @@ func TestPrivateEndpoints(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn("bar"))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow private connections from endpoint 'bar'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'bar'")
})

t.Run("good private connection", func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/sqlproxyccl/acl/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestACLWatcher(t *testing.T) {
}

remove, err := watcher.ListenForDenied(ctx, connection, noError(t))
require.EqualError(t, err, "connection to '10' denied: cluster does not allow private connections from endpoint 'random-connection'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'random-connection'")
require.Nil(t, remove)
})

Expand All @@ -171,7 +171,7 @@ func TestACLWatcher(t *testing.T) {
}

remove, err := watcher.ListenForDenied(ctx, connection, noError(t))
require.EqualError(t, err, "connection to '10' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
require.Nil(t, remove)
})

Expand Down Expand Up @@ -283,7 +283,7 @@ func TestACLWatcher(t *testing.T) {
// Emit the same item.
c <- pe

require.EqualError(t, <-errorChan, "connection to '10' denied: cluster does not allow private connections from endpoint 'cockroachdb'")
require.EqualError(t, <-errorChan, "cluster does not allow private connections from endpoint 'cockroachdb'")
})

t.Run("connection is denied by update to public cidr ranges", func(t *testing.T) {
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestACLWatcher(t *testing.T) {
// Emit the same item.
c <- pcr

require.EqualError(t, <-errorChan, "connection to '10' denied: cluster does not allow public connections from IP 1.1.2.2")
require.EqualError(t, <-errorChan, "cluster does not allow public connections from IP 1.1.2.2")
})

t.Run("unregister removes listeners", func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ var authenticate = func(
case *pgproto3.AuthenticationOk:
throttleError := throttleHook(throttler.AttemptOK)
if throttleError != nil {
if err = feSend(toPgError(throttleError)); err != nil {
// Send a user-facing error.
if err = feSend(toPgError(authThrottledError)); err != nil {
return nil, err
}
return nil, throttleError
Expand All @@ -125,7 +126,8 @@ var authenticate = func(
throttleError = throttleHook(throttler.AttemptInvalidCredentials)
}
if throttleError != nil {
if err = feSend(toPgError(throttleError)); err != nil {
// Send a user-facing error.
if err = feSend(toPgError(authThrottledError)); err != nil {
return nil, err
}
return nil, throttleError
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/testutilsccl"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -140,13 +141,19 @@ func TestAuthenticateThrottled(t *testing.T) {
go server(t, sqlServer)
go client(t, sqlClient)

_, err := authenticate(proxyToClient, proxyToServer, nil, /* proxyBackendKeyData */
// The error returned from authenticate should be different from the error
// received at the client.
_, err := authenticate(
proxyToClient,
proxyToServer,
nil, /* proxyBackendKeyData */
func(status throttler.AttemptStatus) error {
require.Equal(t, throttler.AttemptInvalidCredentials, status)
return authThrottledError
})
return errors.New("request denied")
},
)
require.Error(t, err)
require.Contains(t, err.Error(), "too many failed authentication attempts")
require.Contains(t, err.Error(), "request denied")

proxyToServer.Close()
proxyToClient.Close()
Expand Down
15 changes: 10 additions & 5 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const (
// received from the client after TLS negotiation.
codeUnexpectedStartupMessage

// codeParamsRoutingFailed indicates an error choosing a backend address based
// on the client's session parameters.
// codeParamsRoutingFailed indicates an error choosing a backend address
// based on the client's session parameters.
codeParamsRoutingFailed

// codeBackendDialFailed indicates an error establishing a connection
Expand All @@ -59,24 +59,29 @@ const (
// (with a connection error) while in a session with backend SQL server.
codeClientDisconnected

// codeProxyRefusedConnection indicates that the proxy refused the connection
// request due to high load or too many connection attempts.
// codeProxyRefusedConnection indicates that the proxy refused the
// connection request due to too many invalid connection attempts, or
// because the incoming connection session does not match the ACL rules
// for the cluster.
codeProxyRefusedConnection

// codeExpiredClientConnection indicates that proxy connection to the client
// has expired and should be closed.
codeExpiredClientConnection

// codeUnavailable indicates that the backend SQL server exists but is not
// accepting connections. For example, a tenant cluster that has maxPods set to 0.
// accepting connections. For example, a tenant cluster that has maxPods
// set to 0.
codeUnavailable
)

// errWithCode combines an error with one of the above codes to ease
// the processing of the errors.
//
// This follows the same pattern used by cockroachdb/errors that allows
// decorating errors with additional information. Check WithStack, WithHint,
// WithDetail etc.
//
// By using the pattern, the decorations are chained and allow searching and
// extracting later on. See getErrorCode bellow.
type errWithCode struct {
Expand Down
84 changes: 57 additions & 27 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ var (
// smaller than 100 characters. We don't perform an exact match here for
// more flexibility.
clusterNameRegex = regexp.MustCompile("^[a-z0-9][a-z0-9-]{4,98}[a-z0-9]$")

// highFreqErrorMarker is a marker that indicates that a particular error
// is of high-frequency and should be throttled accordingly. Use with
// errors.Mark and errors.Is.
highFreqErrorMarker = errors.New("high-frequency error")
)

const (
Expand Down Expand Up @@ -380,6 +385,19 @@ func (handler *proxyHandler) handle(
return errors.Wrap(err, "extracting cluster identifier")
}

// Add optional request tags so that callers can provide a better context
// for errors. This is best-effort only, and will be applied if the incoming
// context includes a requestTags object.
reqTags := requestTagsFromContext(ctx)

// Tenant ID is always valid in the request. We will defer the addition of
// cluster name into the logtags after validating the connection since there
// is a possibility where the cluster name won't match.
ctx = logtags.AddTag(ctx, "tenant", tenID.String())
if reqTags != nil {
reqTags["tenant"] = tenID.String()
}

// Validate the incoming connection and ensure that the cluster name
// matches the tenant's. This avoids malicious actors from attempting to
// connect to the cluster using just the tenant ID.
Expand All @@ -389,16 +407,13 @@ func (handler *proxyHandler) handle(
return err
}

// Only add logtags after validating the connection. If the connection isn't
// Add cluster name after validating the connection. If the connection isn't
// validated, clusterName may not match the tenant ID, and this could cause
// confusion when analyzing logs.
ctx = logtags.AddTag(ctx, "cluster", clusterName)
ctx = logtags.AddTag(ctx, "tenant", tenID)

// Add request tags so that callers can provide a better context for errors.
reqTags := requestTagsFromContext(ctx)
reqTags["cluster"] = clusterName
reqTags["tenant"] = tenID
if reqTags != nil {
reqTags["cluster"] = clusterName
}

// Use an empty string as the default port as we only care about the
// correctly parsing the IP address here.
Expand All @@ -418,8 +433,15 @@ func (handler *proxyHandler) handle(
EndpointID: endpointID,
},
func(err error) {
// Whenever a connection expires, it will be closed by the proxy
// once the ACL watcher fires. The client will observe that the
// connection has been closed abruptly. One UX enhancement that
// could be made here is to send an ErrorResponse on the next pgwire
// message boundary to the client so that users know why their
// connections were closed. Implementing this will require
// communicating with the forwarder (e.g. "sendErrorSafelyAndClose").
err = withCode(
errors.Wrap(err, "connection blocked by access control list"),
errors.Wrap(err, "connection no longer allowed by access control list"),
codeExpiredClientConnection,
)
select {
Expand All @@ -434,11 +456,20 @@ func (handler *proxyHandler) handle(
// "connection refused" error. The next time they connect, they will
// get a "not found" error.
//
// TODO(jaylim-crl): We can enrich this error with the proper reason on
// why they were refused (e.g. IP allowlist, or private endpoints).
// We will return a generic "connection refused" error in this case.
// From a security perspective, it's preferable not to disclose specific
// details -- similar to how we use "invalid username or password"
// rather than just "invalid password". This does come with its own
// drawbacks (e.g. users may not know why), but that's fine, since they
// should have all the necessary information needed to debug ths issue.
clientErr := withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "connection blocked by access control list")

// Users often misconfigure their allowed CIDR ranges or private
// endpoints, leading to excessive retry traffic. We will mark it as
// a high-frequency error so that callers that log errors could
// implement rate limiting accordingly.
return errors.Mark(errors.Wrap(err, "connection blocked by access control list"), highFreqErrorMarker)
}
defer removeListener()

Expand All @@ -447,13 +478,12 @@ func (handler *proxyHandler) handle(
if err != nil {
clientErr := authThrottledError
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)

// The throttle service is used to rate limit invalid login attempts
// from IP addresses, and it is commonly prone to generating excessive
// traffic in practice. Due to that, we'll return a nil here to prevent
// callers from logging this request. However, LoginCheck itself
// periodically logs an error when such requests are rate limited, so
// we won't miss any signals by doing this.
return nil //nolint:returnerrcheck
// traffic in practice. Due to that, we'll mark it as a high-frequency
// error to rate-limit logs.
return errors.Mark(err, highFreqErrorMarker)
}

connector := &connector{
Expand Down Expand Up @@ -482,13 +512,9 @@ func (handler *proxyHandler) handle(

crdbConn, sentToClient, err := connector.OpenTenantConnWithAuth(ctx, f, fe.Conn,
func(status throttler.AttemptStatus) error {
if err := handler.throttleService.ReportAttempt(
ctx, throttleTags, throttleTime, status,
); err != nil {
// We have to log here because errors returned by this closure
// will be sent to the client.
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return authThrottledError
err := handler.throttleService.ReportAttempt(ctx, throttleTags, throttleTime, status)
if err != nil {
return errors.Mark(err, highFreqErrorMarker)
}
return nil
},
Expand Down Expand Up @@ -579,6 +605,7 @@ func (handler *proxyHandler) validateConnection(
if err != nil && status.Code(err) != codes.NotFound {
return err
}

if err == nil {
if tenant.ClusterName == "" || tenant.ClusterName == clusterName {
return nil
Expand All @@ -590,10 +617,13 @@ func (handler *proxyHandler) validateConnection(
tenant.ClusterName,
)
}
return withCode(
errors.Newf("cluster %s-%d not found", clusterName, tenantID.ToUint64()),
codeParamsRoutingFailed,
)

// The codes.NotFound error is a common occurrence, as we have often
// observed situations where a user deletes their tenant, but their
// application continues running.
clientErr := errors.Newf("cluster %s-%d not found", clusterName, tenantID.ToUint64())
clientErr = errors.Mark(clientErr, highFreqErrorMarker)
return withCode(clientErr, codeParamsRoutingFailed)
}

// handleCancelRequest handles a pgwire query cancel request by either
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
t.Run("not found/no cluster name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, invalidTenantID, "")
require.Regexp(t, "codeParamsRoutingFailed: cluster -99 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("not found", func(t *testing.T) {
err := s.handler.validateConnection(ctx, invalidTenantID, "foo-bar")
require.Regexp(t, "codeParamsRoutingFailed: cluster foo-bar-99 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("found/tenant without name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantWithoutNameID, "foo-bar")
Expand All @@ -116,10 +118,12 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
t.Run("found/connection without name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantID, "")
require.Regexp(t, "codeParamsRoutingFailed: cluster -10 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("found/tenant name mismatch", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantID, "foo-bar")
require.Regexp(t, "codeParamsRoutingFailed: cluster foo-bar-10 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})

// Stop the directory server.
Expand All @@ -130,6 +134,7 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
// Use a new tenant ID here to force GetTenant.
err := s.handler.validateConnection(ctx, roachpb.MustMakeTenantID(100), "")
require.Regexp(t, "directory server has not been started", err.Error())
require.False(t, errors.Is(err, highFreqErrorMarker))
})
}

Expand Down
Loading

0 comments on commit bc1a949

Please sign in to comment.