Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/sqlproxyccl: improve connection logging behavior #134613

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/ccl/sqlproxyccl/balancer/conn_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type ConnTracker struct {
// tenants refer to a list of tenant entries.
tenants map[roachpb.TenantID]*tenantEntry
}

// verboseLogging indicates whether verbose logging is enabled for the
// connection tracker. We store it once here to avoid the vmodule mutex
// each time we call log.V.
verboseLogging bool
}

// NewConnTracker returns a new instance of the connection tracker. All exposed
Expand All @@ -49,7 +54,7 @@ func NewConnTracker(
timeSource = timeutil.DefaultTimeSource{}
}

t := &ConnTracker{timeSource: timeSource}
t := &ConnTracker{timeSource: timeSource, verboseLogging: log.V(2)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is log level 2 and how did you pick it? Are there corresponding constants like error/warn/info/etc.?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't think we have any rule of thumb around vmodule logging levels. See this internal Slack thread: https://cockroachlabs.slack.com/archives/C9TGBJB44/p1614100926026700. Each vmodule logging level seems to be specific to the file itself. I picked 2 somewhat arbitrarily; it's a middle value, and opens up opportunities for something less/more verbose as well.

For this case, if we wanted to display logs from the connection tracker, we would start the proxy with the following flag: --vmodule=conn_tracker=2 (i.e. --vmodule=FILE=LEVEL,FILE2=LEVEL2,...).

t.mu.tenants = make(map[roachpb.TenantID]*tenantEntry)

if err := stopper.RunAsyncTask(
Expand Down Expand Up @@ -79,7 +84,11 @@ func (t *ConnTracker) GetConnsMap(tenantID roachpb.TenantID) map[string][]Connec
func (t *ConnTracker) registerAssignment(tenantID roachpb.TenantID, sa *ServerAssignment) {
e := t.getEntry(tenantID, true /* allowCreate */)
if e.addAssignment(sa) {
logTrackerEvent("registerAssignment", sa)
// Explicitly use a separate `if` block to avoid unintentional short
// circuit bugs in the future. `a && b()` vs `b() && a` are not the same.
if t.verboseLogging {
logTrackerEvent("registerAssignment", sa)
}
}
}

Expand All @@ -88,7 +97,11 @@ func (t *ConnTracker) registerAssignment(tenantID roachpb.TenantID, sa *ServerAs
func (t *ConnTracker) unregisterAssignment(tenantID roachpb.TenantID, sa *ServerAssignment) {
e := t.getEntry(tenantID, false /* allowCreate */)
if e != nil && e.removeAssignment(sa) {
logTrackerEvent("unregisterAssignment", sa)
// Explicitly use a separate `if` block to avoid unintentional short
// circuit bugs in the future. `a && b()` vs `b() && a` are not the same.
if t.verboseLogging {
logTrackerEvent("unregisterAssignment", sa)
}
}
}

Expand Down
79 changes: 49 additions & 30 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,10 @@ type proxyHandler struct {
cancelInfoMap *cancelInfoMap
}

const throttledErrorHint string = `Connection throttling is triggered by repeated authentication failure. Make
sure the username and password are correct.
`
const throttledErrorHint string = `Connection throttling is triggered by repeated authentication failure. Make sure the username and password are correct.`

var authThrottledError = errors.WithHint(
withCode(errors.New(
"too many failed authentication attempts"), codeProxyRefusedConnection),
withCode(errors.New("too many failed authentication attempts"), codeProxyRefusedConnection),
throttledErrorHint)

// newProxyHandler will create a new proxy handler with configuration based on
Expand Down Expand Up @@ -372,34 +369,44 @@ func (handler *proxyHandler) handle(

// NOTE: Errors returned from this function are user-facing errors so we
// should be careful with the details that we want to expose.
//
// TODO(jaylim-crl): Update this such that we return both the internal and
// user-facing errors from clusterNameAndTenantFromParams. Only the internal
// error should be returned to the caller.
backendStartupMsg, clusterName, tenID, err := clusterNameAndTenantFromParams(ctx, fe, handler.metrics)
if err != nil {
clientErr := withCode(err, codeParamsRoutingFailed)
log.Errorf(ctx, "unable to extract cluster name and tenant id: %s", err.Error())
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return clientErr
return errors.Wrap(err, "extracting cluster identifier")
}

// Validate the incoming connection and ensure that the cluster name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this block of code move?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to ensure that we only add logtags after validating the connection. If we left the validation in its original location, we may end up having a log line with a mismatched cluster name and tenant ID, leading to confusions during debugging. See the block that comes after the validation:

// Only add logtags after validating the connection. If the connection isn't
// validated, clusterName may not match the tenant ID, and this could cause
// confusion when analyzing logs.
ctx = logtags.AddTag(ctx, "cluster", clusterName)
ctx = logtags.AddTag(ctx, "tenant", tenID)

The proxy uses the tenant ID to spin up a new SQL server. Back then, there were concerns that users could iterate through --cluster=random-name-1 to --cluster=random-name-N, causing the operator to inadvertently provision resources for those tenants, and leading to potential resource exhaustion on the host. To address that, we implemented a validation check to ensure that "random-name" matches the actual cluster name associated with that tenant ID (as seen in the DnsPrefix field in the CrdbTenant spec, or the tenants table in CC). This validation guarantees that the user connecting to tenant N has some knowledge of the cluster, beyond just the tenant ID, and prevents random or malicious requests from triggering resource allocation.

// matches the tenant's. This avoids malicious actors from attempting to
// connect to the cluster using just the tenant ID.
if err := handler.validateConnection(ctx, tenID, clusterName); err != nil {
// We do not need to log here as validateConnection already logs.
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
}

// Only add logtags after validating the connection. If the connection isn't
// validated, clusterName may not match the tenant ID, and this could cause
// confusion when analyzing logs.
ctx = logtags.AddTag(ctx, "cluster", clusterName)
ctx = logtags.AddTag(ctx, "tenant", tenID)

// Add request tags so that callers can provide a better context for errors.
reqTags := requestTagsFromContext(ctx)
reqTags["cluster"] = clusterName
reqTags["tenant"] = tenID

// Use an empty string as the default port as we only care about the
// correctly parsing the IP address here.
ipAddr, _, err := addr.SplitHostPort(fe.Conn.RemoteAddr().String(), "")
if err != nil {
clientErr := withCode(errors.New("unexpected connection address"), codeParamsRoutingFailed)
log.Errorf(ctx, "could not parse address: %v", err.Error())
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return clientErr
}

// Validate the incoming connection and ensure that the cluster name
// matches the tenant's. This avoids malicious actors from attempting to
// connect to the cluster using just the tenant ID.
if err := handler.validateConnection(ctx, tenID, clusterName); err != nil {
// We do not need to log here as validateConnection already logs.
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
return errors.Wrap(err, "parsing remote address")
}

errConnection := make(chan error, 1)
Expand All @@ -426,20 +433,27 @@ func (handler *proxyHandler) handle(
// with a deleting tenant. This case is rare, and we'll just return a
// "connection refused" error. The next time they connect, they will
// get a "not found" error.
log.Errorf(ctx, "connection blocked by access control list: %v", err)
err = withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
//
// TODO(jaylim-crl): We can enrich this error with the proper reason on
// why they were refused (e.g. IP allowlist, or private endpoints).
clientErr := withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "connection blocked by access control list")
}
defer removeListener()

throttleTags := throttler.ConnectionTags{IP: ipAddr, TenantID: tenID.String()}
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
throttleTime, err := handler.throttleService.LoginCheck(ctx, throttleTags)
if err != nil {
log.Errorf(ctx, "throttler refused connection: %v", err.Error())
err = authThrottledError
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
clientErr := authThrottledError
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
// The throttle service is used to rate limit invalid login attempts
// from IP addresses, and it is commonly prone to generating excessive
// traffic in practice. Due to that, we'll return a nil here to prevent
// callers from logging this request. However, LoginCheck itself
// periodically logs an error when such requests are rate limited, so
// we won't miss any signals by doing this.
return nil //nolint:returnerrcheck
}

connector := &connector{
Expand Down Expand Up @@ -471,14 +485,15 @@ func (handler *proxyHandler) handle(
if err := handler.throttleService.ReportAttempt(
ctx, throttleTags, throttleTime, status,
); err != nil {
// We have to log here because errors returned by this closure
// will be sent to the client.
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return authThrottledError
}
return nil
},
)
if err != nil {
log.Errorf(ctx, "could not connect to cluster: %v", err.Error())
if sentToClient {
handler.metrics.updateForError(err)
} else {
Expand All @@ -490,16 +505,20 @@ func (handler *proxyHandler) handle(

// Update the cancel info.
handler.cancelInfoMap.addCancelInfo(connector.CancelInfo.proxySecretID(), connector.CancelInfo)
defer func() {
handler.cancelInfoMap.deleteCancelInfo(connector.CancelInfo.proxySecretID())
}()

// Record the connection success and how long it took.
handler.metrics.ConnectionLatency.RecordValue(timeutil.Since(connReceivedTime).Nanoseconds())
handler.metrics.SuccessfulConnCount.Inc(1)

log.Infof(ctx, "new connection")
// TOOD(jaylim-crl): Consider replacing this with a metric that measures
// connection lifetime. We might also be able to fetch these by analyzing
// the session logs.
connBegin := timeutil.Now()
defer func() {
log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds())
handler.cancelInfoMap.deleteCancelInfo(connector.CancelInfo.proxySecretID())
}()

// Wrap the client connection with an error annotater. WARNING: The TLS
Expand Down
54 changes: 50 additions & 4 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,38 @@ func (s *Server) serve(ctx context.Context, ln net.Listener, requireProxyProtoco

err = s.Stopper.RunAsyncTask(ctx, "proxy-con-serve", func(ctx context.Context) {
defer func() { _ = conn.Close() }()

s.metrics.CurConnCount.Inc(1)
defer s.metrics.CurConnCount.Dec(1)
remoteAddr := conn.RemoteAddr()
ctxWithTag := logtags.AddTag(ctx, "client", log.SafeOperational(remoteAddr))
if err := s.handler.handle(ctxWithTag, conn, requireProxyProtocol); err != nil {
log.Infof(ctxWithTag, "connection error: %v", err)

ctx = logtags.AddTag(ctx, "client", log.SafeOperational(conn.RemoteAddr()))

// Use a map to collect request-specific information at higher
// layers of the stack. This helps ensure that all relevant
// information is captured, providing better context for the error
// logs.
//
// We could improve this by creating a custom context.Context object
// to track all data related to the request (including migration
// history). For now, this approach is adequate.
reqTags := make(map[string]interface{})
ctx = contextWithRequestTags(ctx, reqTags)

err := s.handler.handle(ctx, conn, requireProxyProtocol)
if err != nil && !errors.Is(err, context.Canceled) {
for key, value := range reqTags {
ctx = logtags.AddTag(ctx, key, value)
}
// log.Infof automatically prints hints (one per line) that are
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also change the behavior of log.Infof or is that a much larger task?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log package (https://github.com/cockroachdb/cockroach/tree/master/pkg/util/log) is utilized extensively throughout the CRDB codebase, and I'd imagine there could be a scenario where we actually want hints. For the proxy's use case, I think the right thing to do here is to avoid returning user-facing error messages from handle. This would require a deeper audit, and some refactoring of the logic and tests. For now, this approach works to address the unnecessary log spam issue.

// associated with the input error object. This causes
// unnecessary log spam, especially when proxy hints are meant
// for the user. We will intentionally create a new error object
// without the hints just for logging purposes.
//
// TODO(jaylim-crl): Ensure that handle does not return user
// facing errors (i.e. one that contains hints).
errWithoutHints := errors.Newf("%s", err.Error()) // nolint:errwrap
log.Infof(ctx, "connection closed: %v", errWithoutHints)
}
})
if err != nil {
Expand Down Expand Up @@ -322,3 +348,23 @@ func (s *Server) AwaitNoConnections(ctx context.Context) <-chan struct{} {

return c
}

// requestTagsContextKey is the type of a context.Value key used to carry the
// request tags map in a context.Context object.
type requestTagsContextKey struct{}

// contextWithRequestTags returns a context annotated with the provided request
// tags map. Use requestTagsFromContext(ctx) to retrieve it back.
func contextWithRequestTags(ctx context.Context, reqTags map[string]interface{}) context.Context {
return context.WithValue(ctx, requestTagsContextKey{}, reqTags)
}

// requestTagsFromContext retrieves the request tags map stored in the context
// via contextWithRequestTags.
func requestTagsFromContext(ctx context.Context) map[string]interface{} {
r := ctx.Value(requestTagsContextKey{})
if r == nil {
return nil
}
return r.(map[string]interface{})
}
12 changes: 11 additions & 1 deletion pkg/ccl/sqlproxyccl/throttler/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func NewLocalService(opts ...LocalOption) Service {
return s
}

var _ Service = (*localService)(nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this line do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that localService implements the Service interface at compile time. This is the same as doing:

var _ Service = &localService{}

We use this pattern in CC as well.


func (s *localService) lockedGetThrottle(connection ConnectionTags) *throttle {
l, ok := s.mu.throttleCache.Get(connection)
if ok && l != nil {
Expand All @@ -86,18 +88,26 @@ func (s *localService) lockedInsertThrottle(connection ConnectionTags) *throttle
return l
}

func (s *localService) LoginCheck(connection ConnectionTags) (time.Time, error) {
// LoginCheck implements the Service interface.
func (s *localService) LoginCheck(
ctx context.Context, connection ConnectionTags,
) (time.Time, error) {
s.mu.Lock()
defer s.mu.Unlock()

now := s.clock()
throttle := s.lockedGetThrottle(connection)
if throttle != nil && throttle.isThrottled(now) {
if throttle.everyLog.ShouldLog() {
// ctx should include logtags about the connection.
log.Error(ctx, "throttler refused connection due to too many failed authentication attempts")
}
Comment on lines +101 to +104
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we didn't want the service to probe into the internals of the throttle struct (e.g. everyLog), this could be its own method on throttle (e.g. reportThrottled), but I don't feel strongly.

return now, errRequestDenied
}
return now, nil
}

// ReportAttempt implements the Service interface.
func (s *localService) ReportAttempt(
ctx context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus,
) error {
Expand Down
19 changes: 12 additions & 7 deletions pkg/ccl/sqlproxyccl/throttler/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type testLocalService struct {
clock fakeClock
}

var _ Service = (*testLocalService)(nil)

func newTestLocalService(opts ...LocalOption) *testLocalService {
s := &testLocalService{
localService: NewLocalService(opts...).(*localService),
Expand All @@ -49,16 +51,17 @@ func countGuesses(
step time.Duration,
period time.Duration,
) int {
ctx := context.Background()
count := 0
for i := 0; step*time.Duration(i) < period; i++ {
throttle.clock.advance(step)

throttleTime, err := throttle.LoginCheck(connection)
throttleTime, err := throttle.LoginCheck(ctx, connection)
if err != nil {
continue
}

err = throttle.ReportAttempt(context.Background(), connection, throttleTime, AttemptInvalidCredentials)
err = throttle.ReportAttempt(ctx, connection, throttleTime, AttemptInvalidCredentials)
require.NoError(t, err, "ReportAttempt should only return errors in the case of racing requests")

count++
Expand Down Expand Up @@ -95,13 +98,14 @@ func TestReportSuccessDisablesLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()
testutilsccl.ServerlessOnly(t)

ctx := context.Background()
throttle := newTestLocalService()
tenant1 := ConnectionTags{IP: "1.1.1.1", TenantID: "1"}
tenant2 := ConnectionTags{IP: "1.1.1.1", TenantID: "2"}

throttleTime, err := throttle.LoginCheck(tenant1)
throttleTime, err := throttle.LoginCheck(ctx, tenant1)
require.NoError(t, err)
require.NoError(t, throttle.ReportAttempt(context.Background(), tenant1, throttleTime, AttemptOK))
require.NoError(t, throttle.ReportAttempt(ctx, tenant1, throttleTime, AttemptOK))

require.Equal(t,
int(time.Hour/time.Second),
Expand All @@ -120,19 +124,20 @@ func TestRacingRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
testutilsccl.ServerlessOnly(t)

ctx := context.Background()
throttle := newTestLocalService()
connection := ConnectionTags{IP: "1.1.1.1", TenantID: "1"}

throttleTime, err := throttle.LoginCheck(connection)
throttleTime, err := throttle.LoginCheck(ctx, connection)
require.NoError(t, err)

require.NoError(t, throttle.ReportAttempt(context.Background(), connection, throttleTime, AttemptInvalidCredentials))
require.NoError(t, throttle.ReportAttempt(ctx, connection, throttleTime, AttemptInvalidCredentials))

l := throttle.lockedGetThrottle(connection)
nextTime := l.nextTime

for _, status := range []AttemptStatus{AttemptOK, AttemptInvalidCredentials} {
require.Error(t, throttle.ReportAttempt(context.Background(), connection, throttleTime, status))
require.Error(t, throttle.ReportAttempt(ctx, connection, throttleTime, status))

// Verify the throttled report has no affect on limiter state.
l := throttle.lockedGetThrottle(connection)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/throttler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
type Service interface {
// LoginCheck determines whether a login request should be allowed to
// proceed. It rate limits login attempts from IP addresses.
LoginCheck(connection ConnectionTags) (time.Time, error)
LoginCheck(ctx context.Context, connection ConnectionTags) (time.Time, error)

// Report an authentication attempt. The throttleTime is used to
// retroactively throttle the request if a racing request triggered the
Expand All @@ -46,5 +46,5 @@ type Service interface {
// error instead of authentication success/failure. This limits the
// information a malicious user gets from using racing requests to guess
// multiple passwords in one throttle window.
ReportAttempt(context context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus) error
ReportAttempt(ctx context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus) error
}
Loading
Loading