Skip to content

Commit

Permalink
Merge pull request #135219 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2-135008

release-24.2: ccl/sqlproxyccl: throttle logging of all high-frequency errors
  • Loading branch information
jaylim-crl authored Nov 18, 2024
2 parents 2fa156c + 4f3e0cc commit 4db7acb
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 86 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/security/certmgr",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/util/cache",
"//pkg/util/grpcutil",
"//pkg/util/httputil",
"//pkg/util/log",
Expand Down
9 changes: 2 additions & 7 deletions pkg/ccl/sqlproxyccl/acl/cidr_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ func (p *CIDRRanges) CheckConnection(ctx context.Context, conn ConnectionTags) e
}
}

// By default, connections are rejected if no ranges match the connection's
// IP.
return errors.Newf(
"connection to '%s' denied: cluster does not allow public connections from IP %s",
conn.TenantID.String(),
conn.IP,
)
// By default, connections are rejected if no ranges match the connection's IP.
return errors.Newf("cluster does not allow public connections from IP %s", conn.IP)
}
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/acl/cidr_ranges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestCIDRRanges(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn(""))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
})

t.Run("default behavior if no entries", func(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestCIDRRanges(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn(""))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
})

t.Run("good public connection", func(t *testing.T) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ func (p *PrivateEndpoints) CheckConnection(ctx context.Context, conn ConnectionT

// By default, connections are rejected if no endpoints match the
// connection's endpoint ID.
return errors.Newf(
"connection to '%s' denied: cluster does not allow private connections from endpoint '%s'",
conn.TenantID.String(),
conn.EndpointID,
)
return errors.Newf("cluster does not allow private connections from endpoint '%s'", conn.EndpointID)
}

// FindPrivateEndpointID looks for the endpoint identifier within the connection
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestPrivateEndpoints(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn("bar"))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow private connections from endpoint 'bar'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'bar'")
})

t.Run("default behavior if no entries", func(t *testing.T) {
Expand All @@ -78,7 +78,7 @@ func TestPrivateEndpoints(t *testing.T) {
},
}
err := p.CheckConnection(ctx, makeConn("bar"))
require.EqualError(t, err, "connection to '42' denied: cluster does not allow private connections from endpoint 'bar'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'bar'")
})

t.Run("good private connection", func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/sqlproxyccl/acl/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestACLWatcher(t *testing.T) {
}

remove, err := watcher.ListenForDenied(ctx, connection, noError(t))
require.EqualError(t, err, "connection to '10' denied: cluster does not allow private connections from endpoint 'random-connection'")
require.EqualError(t, err, "cluster does not allow private connections from endpoint 'random-connection'")
require.Nil(t, remove)
})

Expand All @@ -171,7 +171,7 @@ func TestACLWatcher(t *testing.T) {
}

remove, err := watcher.ListenForDenied(ctx, connection, noError(t))
require.EqualError(t, err, "connection to '10' denied: cluster does not allow public connections from IP 127.0.0.1")
require.EqualError(t, err, "cluster does not allow public connections from IP 127.0.0.1")
require.Nil(t, remove)
})

Expand Down Expand Up @@ -283,7 +283,7 @@ func TestACLWatcher(t *testing.T) {
// Emit the same item.
c <- pe

require.EqualError(t, <-errorChan, "connection to '10' denied: cluster does not allow private connections from endpoint 'cockroachdb'")
require.EqualError(t, <-errorChan, "cluster does not allow private connections from endpoint 'cockroachdb'")
})

t.Run("connection is denied by update to public cidr ranges", func(t *testing.T) {
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestACLWatcher(t *testing.T) {
// Emit the same item.
c <- pcr

require.EqualError(t, <-errorChan, "connection to '10' denied: cluster does not allow public connections from IP 1.1.2.2")
require.EqualError(t, <-errorChan, "cluster does not allow public connections from IP 1.1.2.2")
})

t.Run("unregister removes listeners", func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/sqlproxyccl/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ var authenticate = func(
case *pgproto3.AuthenticationOk:
throttleError := throttleHook(throttler.AttemptOK)
if throttleError != nil {
if err = feSend(toPgError(throttleError)); err != nil {
// Send a user-facing error.
if err = feSend(toPgError(authThrottledError)); err != nil {
return nil, err
}
return nil, throttleError
Expand All @@ -125,7 +126,8 @@ var authenticate = func(
throttleError = throttleHook(throttler.AttemptInvalidCredentials)
}
if throttleError != nil {
if err = feSend(toPgError(throttleError)); err != nil {
// Send a user-facing error.
if err = feSend(toPgError(authThrottledError)); err != nil {
return nil, err
}
return nil, throttleError
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/testutilsccl"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -140,13 +141,19 @@ func TestAuthenticateThrottled(t *testing.T) {
go server(t, sqlServer)
go client(t, sqlClient)

_, err := authenticate(proxyToClient, proxyToServer, nil, /* proxyBackendKeyData */
// The error returned from authenticate should be different from the error
// received at the client.
_, err := authenticate(
proxyToClient,
proxyToServer,
nil, /* proxyBackendKeyData */
func(status throttler.AttemptStatus) error {
require.Equal(t, throttler.AttemptInvalidCredentials, status)
return authThrottledError
})
return errors.New("request denied")
},
)
require.Error(t, err)
require.Contains(t, err.Error(), "too many failed authentication attempts")
require.Contains(t, err.Error(), "request denied")

proxyToServer.Close()
proxyToClient.Close()
Expand Down
15 changes: 10 additions & 5 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ const (
// received from the client after TLS negotiation.
codeUnexpectedStartupMessage

// codeParamsRoutingFailed indicates an error choosing a backend address based
// on the client's session parameters.
// codeParamsRoutingFailed indicates an error choosing a backend address
// based on the client's session parameters.
codeParamsRoutingFailed

// codeBackendDialFailed indicates an error establishing a connection
Expand All @@ -59,24 +59,29 @@ const (
// (with a connection error) while in a session with backend SQL server.
codeClientDisconnected

// codeProxyRefusedConnection indicates that the proxy refused the connection
// request due to high load or too many connection attempts.
// codeProxyRefusedConnection indicates that the proxy refused the
// connection request due to too many invalid connection attempts, or
// because the incoming connection session does not match the ACL rules
// for the cluster.
codeProxyRefusedConnection

// codeExpiredClientConnection indicates that proxy connection to the client
// has expired and should be closed.
codeExpiredClientConnection

// codeUnavailable indicates that the backend SQL server exists but is not
// accepting connections. For example, a tenant cluster that has maxPods set to 0.
// accepting connections. For example, a tenant cluster that has maxPods
// set to 0.
codeUnavailable
)

// errWithCode combines an error with one of the above codes to ease
// the processing of the errors.
//
// This follows the same pattern used by cockroachdb/errors that allows
// decorating errors with additional information. Check WithStack, WithHint,
// WithDetail etc.
//
// By using the pattern, the decorations are chained and allow searching and
// extracting later on. See getErrorCode bellow.
type errWithCode struct {
Expand Down
84 changes: 57 additions & 27 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ var (
// smaller than 100 characters. We don't perform an exact match here for
// more flexibility.
clusterNameRegex = regexp.MustCompile("^[a-z0-9][a-z0-9-]{4,98}[a-z0-9]$")

// highFreqErrorMarker is a marker that indicates that a particular error
// is of high-frequency and should be throttled accordingly. Use with
// errors.Mark and errors.Is.
highFreqErrorMarker = errors.New("high-frequency error")
)

const (
Expand Down Expand Up @@ -380,6 +385,19 @@ func (handler *proxyHandler) handle(
return errors.Wrap(err, "extracting cluster identifier")
}

// Add optional request tags so that callers can provide a better context
// for errors. This is best-effort only, and will be applied if the incoming
// context includes a requestTags object.
reqTags := requestTagsFromContext(ctx)

// Tenant ID is always valid in the request. We will defer the addition of
// cluster name into the logtags after validating the connection since there
// is a possibility where the cluster name won't match.
ctx = logtags.AddTag(ctx, "tenant", tenID.String())
if reqTags != nil {
reqTags["tenant"] = tenID.String()
}

// Validate the incoming connection and ensure that the cluster name
// matches the tenant's. This avoids malicious actors from attempting to
// connect to the cluster using just the tenant ID.
Expand All @@ -389,16 +407,13 @@ func (handler *proxyHandler) handle(
return err
}

// Only add logtags after validating the connection. If the connection isn't
// Add cluster name after validating the connection. If the connection isn't
// validated, clusterName may not match the tenant ID, and this could cause
// confusion when analyzing logs.
ctx = logtags.AddTag(ctx, "cluster", clusterName)
ctx = logtags.AddTag(ctx, "tenant", tenID)

// Add request tags so that callers can provide a better context for errors.
reqTags := requestTagsFromContext(ctx)
reqTags["cluster"] = clusterName
reqTags["tenant"] = tenID
if reqTags != nil {
reqTags["cluster"] = clusterName
}

// Use an empty string as the default port as we only care about the
// correctly parsing the IP address here.
Expand All @@ -418,8 +433,15 @@ func (handler *proxyHandler) handle(
EndpointID: endpointID,
},
func(err error) {
// Whenever a connection expires, it will be closed by the proxy
// once the ACL watcher fires. The client will observe that the
// connection has been closed abruptly. One UX enhancement that
// could be made here is to send an ErrorResponse on the next pgwire
// message boundary to the client so that users know why their
// connections were closed. Implementing this will require
// communicating with the forwarder (e.g. "sendErrorSafelyAndClose").
err = withCode(
errors.Wrap(err, "connection blocked by access control list"),
errors.Wrap(err, "connection no longer allowed by access control list"),
codeExpiredClientConnection,
)
select {
Expand All @@ -434,11 +456,20 @@ func (handler *proxyHandler) handle(
// "connection refused" error. The next time they connect, they will
// get a "not found" error.
//
// TODO(jaylim-crl): We can enrich this error with the proper reason on
// why they were refused (e.g. IP allowlist, or private endpoints).
// We will return a generic "connection refused" error in this case.
// From a security perspective, it's preferable not to disclose specific
// details -- similar to how we use "invalid username or password"
// rather than just "invalid password". This does come with its own
// drawbacks (e.g. users may not know why), but that's fine, since they
// should have all the necessary information needed to debug ths issue.
clientErr := withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "connection blocked by access control list")

// Users often misconfigure their allowed CIDR ranges or private
// endpoints, leading to excessive retry traffic. We will mark it as
// a high-frequency error so that callers that log errors could
// implement rate limiting accordingly.
return errors.Mark(errors.Wrap(err, "connection blocked by access control list"), highFreqErrorMarker)
}
defer removeListener()

Expand All @@ -447,13 +478,12 @@ func (handler *proxyHandler) handle(
if err != nil {
clientErr := authThrottledError
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)

// The throttle service is used to rate limit invalid login attempts
// from IP addresses, and it is commonly prone to generating excessive
// traffic in practice. Due to that, we'll return a nil here to prevent
// callers from logging this request. However, LoginCheck itself
// periodically logs an error when such requests are rate limited, so
// we won't miss any signals by doing this.
return nil //nolint:returnerrcheck
// traffic in practice. Due to that, we'll mark it as a high-frequency
// error to rate-limit logs.
return errors.Mark(err, highFreqErrorMarker)
}

connector := &connector{
Expand Down Expand Up @@ -482,13 +512,9 @@ func (handler *proxyHandler) handle(

crdbConn, sentToClient, err := connector.OpenTenantConnWithAuth(ctx, f, fe.Conn,
func(status throttler.AttemptStatus) error {
if err := handler.throttleService.ReportAttempt(
ctx, throttleTags, throttleTime, status,
); err != nil {
// We have to log here because errors returned by this closure
// will be sent to the client.
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return authThrottledError
err := handler.throttleService.ReportAttempt(ctx, throttleTags, throttleTime, status)
if err != nil {
return errors.Mark(err, highFreqErrorMarker)
}
return nil
},
Expand Down Expand Up @@ -579,6 +605,7 @@ func (handler *proxyHandler) validateConnection(
if err != nil && status.Code(err) != codes.NotFound {
return err
}

if err == nil {
if tenant.ClusterName == "" || tenant.ClusterName == clusterName {
return nil
Expand All @@ -590,10 +617,13 @@ func (handler *proxyHandler) validateConnection(
tenant.ClusterName,
)
}
return withCode(
errors.Newf("cluster %s-%d not found", clusterName, tenantID.ToUint64()),
codeParamsRoutingFailed,
)

// The codes.NotFound error is a common occurrence, as we have often
// observed situations where a user deletes their tenant, but their
// application continues running.
clientErr := errors.Newf("cluster %s-%d not found", clusterName, tenantID.ToUint64())
clientErr = errors.Mark(clientErr, highFreqErrorMarker)
return withCode(clientErr, codeParamsRoutingFailed)
}

// handleCancelRequest handles a pgwire query cancel request by either
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
t.Run("not found/no cluster name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, invalidTenantID, "")
require.Regexp(t, "codeParamsRoutingFailed: cluster -99 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("not found", func(t *testing.T) {
err := s.handler.validateConnection(ctx, invalidTenantID, "foo-bar")
require.Regexp(t, "codeParamsRoutingFailed: cluster foo-bar-99 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("found/tenant without name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantWithoutNameID, "foo-bar")
Expand All @@ -116,10 +118,12 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
t.Run("found/connection without name", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantID, "")
require.Regexp(t, "codeParamsRoutingFailed: cluster -10 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})
t.Run("found/tenant name mismatch", func(t *testing.T) {
err := s.handler.validateConnection(ctx, tenantID, "foo-bar")
require.Regexp(t, "codeParamsRoutingFailed: cluster foo-bar-10 not found", err.Error())
require.True(t, errors.Is(err, highFreqErrorMarker))
})

// Stop the directory server.
Expand All @@ -130,6 +134,7 @@ func TestProxyHandler_ValidateConnection(t *testing.T) {
// Use a new tenant ID here to force GetTenant.
err := s.handler.validateConnection(ctx, roachpb.MustMakeTenantID(100), "")
require.Regexp(t, "directory server has not been started", err.Error())
require.False(t, errors.Is(err, highFreqErrorMarker))
})
}

Expand Down
Loading

0 comments on commit 4db7acb

Please sign in to comment.