Skip to content

Commit

Permalink
ccl/sqlproxyccl: improve connection logging behavior
Browse files Browse the repository at this point in the history
Previously, the connection logging behavior in the proxy had several issues
that led to unnecessary log spam:

1. **Excessive "registerAssignment" and "unregisterAssignment" logs**: These
   logs were emitted for every connection attempt (including migration) and
   were not useful under normal operation.
2. **Redundant error logging**: Most errors were logged twice -- once in the
   `handle` method, and again when it returns.
3. **Unfiltered error hints**: User-facing errors containing hints were logged
   line by line, cluttering the logs.
4. **Lack of context in error logs**: Errors logged from the proxy lacked
   tenant and cluster context, which made troubleshooting more difficult.

This commit addresses these issues as follows:

1. Reduced logging: "registerAssignment" and "unregisterAssignment" logs are
   now only shown with vmodule logging enabled.
2. Error logging improvements: `handle` no longer logs errors (with the
   exception of some cases); the caller is now responsible for logging.
   Additionally, errors with hints are no longer logged, only the main error is
   recorded. When those errors are logged, they will now include the tenant
   and cluster information where possible.

No release note as this is an internal change.

Epic: none

Release note: None
  • Loading branch information
jaylim-crl committed Nov 8, 2024
1 parent a44a9b1 commit 065c944
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 27 deletions.
19 changes: 16 additions & 3 deletions pkg/ccl/sqlproxyccl/balancer/conn_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type ConnTracker struct {
// tenants refer to a list of tenant entries.
tenants map[roachpb.TenantID]*tenantEntry
}

// verboseLogging indicates whether verbose logging is enabled for the
// connection tracker. We store it once here to avoid the vmodule mutex
// each time we call log.V.
verboseLogging bool
}

// NewConnTracker returns a new instance of the connection tracker. All exposed
Expand All @@ -49,7 +54,7 @@ func NewConnTracker(
timeSource = timeutil.DefaultTimeSource{}
}

t := &ConnTracker{timeSource: timeSource}
t := &ConnTracker{timeSource: timeSource, verboseLogging: log.V(2)}
t.mu.tenants = make(map[roachpb.TenantID]*tenantEntry)

if err := stopper.RunAsyncTask(
Expand Down Expand Up @@ -79,7 +84,11 @@ func (t *ConnTracker) GetConnsMap(tenantID roachpb.TenantID) map[string][]Connec
func (t *ConnTracker) registerAssignment(tenantID roachpb.TenantID, sa *ServerAssignment) {
e := t.getEntry(tenantID, true /* allowCreate */)
if e.addAssignment(sa) {
logTrackerEvent("registerAssignment", sa)
// Explicitly use a separate `if` block to avoid unintentional short
// circuit bugs in the future. `a && b()` vs `b() && a` are not the same.
if t.verboseLogging {
logTrackerEvent("registerAssignment", sa)
}
}
}

Expand All @@ -88,7 +97,11 @@ func (t *ConnTracker) registerAssignment(tenantID roachpb.TenantID, sa *ServerAs
func (t *ConnTracker) unregisterAssignment(tenantID roachpb.TenantID, sa *ServerAssignment) {
e := t.getEntry(tenantID, false /* allowCreate */)
if e != nil && e.removeAssignment(sa) {
logTrackerEvent("unregisterAssignment", sa)
// Explicitly use a separate `if` block to avoid unintentional short
// circuit bugs in the future. `a && b()` vs `b() && a` are not the same.
if t.verboseLogging {
logTrackerEvent("unregisterAssignment", sa)
}
}
}

Expand Down
49 changes: 29 additions & 20 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,10 @@ type proxyHandler struct {
cancelInfoMap *cancelInfoMap
}

const throttledErrorHint string = `Connection throttling is triggered by repeated authentication failure. Make
sure the username and password are correct.
`
const throttledErrorHint string = `Connection throttling is triggered by repeated authentication failure. Make sure the username and password are correct.`

var authThrottledError = errors.WithHint(
withCode(errors.New(
"too many failed authentication attempts"), codeProxyRefusedConnection),
withCode(errors.New("too many failed authentication attempts"), codeProxyRefusedConnection),
throttledErrorHint)

// newProxyHandler will create a new proxy handler with configuration based on
Expand Down Expand Up @@ -372,14 +369,21 @@ func (handler *proxyHandler) handle(

// NOTE: Errors returned from this function are user-facing errors so we
// should be careful with the details that we want to expose.
//
// TODO(jaylim-crl): Update this such that we return both the internal and
// user-facing errors from clusterNameAndTenantFromParams. Only the internal
// error should be returned to the caller.
backendStartupMsg, clusterName, tenID, err := clusterNameAndTenantFromParams(ctx, fe, handler.metrics)
if err != nil {
clientErr := withCode(err, codeParamsRoutingFailed)
log.Errorf(ctx, "unable to extract cluster name and tenant id: %s", err.Error())
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return clientErr
return errors.Wrap(err, "extracting cluster identifier")
}

// Add request tags so that callers can provide a better context for errors.
reqTags := requestTagsFromContext(ctx)
reqTags["cluster"] = clusterName
reqTags["tenant"] = tenID
ctx = logtags.AddTag(ctx, "cluster", clusterName)
ctx = logtags.AddTag(ctx, "tenant", tenID)

Expand All @@ -388,9 +392,8 @@ func (handler *proxyHandler) handle(
ipAddr, _, err := addr.SplitHostPort(fe.Conn.RemoteAddr().String(), "")
if err != nil {
clientErr := withCode(errors.New("unexpected connection address"), codeParamsRoutingFailed)
log.Errorf(ctx, "could not parse address: %v", err.Error())
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return clientErr
return errors.Wrap(err, "parsing remote address")
}

// Validate the incoming connection and ensure that the cluster name
Expand Down Expand Up @@ -426,20 +429,21 @@ func (handler *proxyHandler) handle(
// with a deleting tenant. This case is rare, and we'll just return a
// "connection refused" error. The next time they connect, they will
// get a "not found" error.
log.Errorf(ctx, "connection blocked by access control list: %v", err)
err = withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
//
// TODO(jaylim-crl): We can enrich this error with the proper reason on
// why they were refused (e.g. IP allowlist, or private endpoints).
clientErr := withCode(errors.New("connection refused"), codeProxyRefusedConnection)
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "connection blocked by access control list")
}
defer removeListener()

throttleTags := throttler.ConnectionTags{IP: ipAddr, TenantID: tenID.String()}
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
if err != nil {
log.Errorf(ctx, "throttler refused connection: %v", err.Error())
err = authThrottledError
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
clientErr := authThrottledError
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "throttler refused connection")
}

connector := &connector{
Expand Down Expand Up @@ -471,14 +475,15 @@ func (handler *proxyHandler) handle(
if err := handler.throttleService.ReportAttempt(
ctx, throttleTags, throttleTime, status,
); err != nil {
// We have to log here because errors returned by this closure
// will be sent to the client.
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return authThrottledError
}
return nil
},
)
if err != nil {
log.Errorf(ctx, "could not connect to cluster: %v", err.Error())
if sentToClient {
handler.metrics.updateForError(err)
} else {
Expand All @@ -490,16 +495,20 @@ func (handler *proxyHandler) handle(

// Update the cancel info.
handler.cancelInfoMap.addCancelInfo(connector.CancelInfo.proxySecretID(), connector.CancelInfo)
defer func() {
handler.cancelInfoMap.deleteCancelInfo(connector.CancelInfo.proxySecretID())
}()

// Record the connection success and how long it took.
handler.metrics.ConnectionLatency.RecordValue(timeutil.Since(connReceivedTime).Nanoseconds())
handler.metrics.SuccessfulConnCount.Inc(1)

log.Infof(ctx, "new connection")
// TOOD(jaylim-crl): Consider replacing this with a metric that measures
// connection lifetime. We might also be able to fetch these by analyzing
// the session logs.
connBegin := timeutil.Now()
defer func() {
log.Infof(ctx, "closing after %.2fs", timeutil.Since(connBegin).Seconds())
handler.cancelInfoMap.deleteCancelInfo(connector.CancelInfo.proxySecretID())
}()

// Wrap the client connection with an error annotater. WARNING: The TLS
Expand Down
54 changes: 50 additions & 4 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,38 @@ func (s *Server) serve(ctx context.Context, ln net.Listener, requireProxyProtoco

err = s.Stopper.RunAsyncTask(ctx, "proxy-con-serve", func(ctx context.Context) {
defer func() { _ = conn.Close() }()

s.metrics.CurConnCount.Inc(1)
defer s.metrics.CurConnCount.Dec(1)
remoteAddr := conn.RemoteAddr()
ctxWithTag := logtags.AddTag(ctx, "client", log.SafeOperational(remoteAddr))
if err := s.handler.handle(ctxWithTag, conn, requireProxyProtocol); err != nil {
log.Infof(ctxWithTag, "connection error: %v", err)

ctx = logtags.AddTag(ctx, "client", log.SafeOperational(conn.RemoteAddr()))

// Use a map to collect request-specific information at higher
// layers of the stack. This helps ensure that all relevant
// information is captured, providing better context for the error
// logs.
//
// We could improve this by creating a custom context.Context object
// to track all data related to the request (including migration
// history). For now, this approach is adequate.
reqTags := make(map[string]interface{})
ctx = contextWithRequestTags(ctx, reqTags)

err := s.handler.handle(ctx, conn, requireProxyProtocol)
if err != nil && !errors.Is(err, context.Canceled) {
for key, value := range reqTags {
ctx = logtags.AddTag(ctx, key, value)
}
// log.Infof automatically prints hints (one per line) that are
// associated with the input error object. This causes
// unnecessary log spam, especially when proxy hints are meant
// for the user. We will intentionally create a new error object
// without the hints just for logging purposes.
//
// TODO(jaylim-crl): Ensure that handle does not return user
// facing errors (i.e. one that contains hints).
errWithoutHints := errors.Newf("%s", err.Error()) // nolint:errwrap
log.Infof(ctx, "connection error: %v", errWithoutHints)
}
})
if err != nil {
Expand Down Expand Up @@ -322,3 +348,23 @@ func (s *Server) AwaitNoConnections(ctx context.Context) <-chan struct{} {

return c
}

// requestTagsContextKey is the type of a context.Value key used to carry the
// request tags map in a context.Context object.
type requestTagsContextKey struct{}

// contextWithRequestTags returns a context annotated with the provided request
// tags map. Use requestTagsFromContext(ctx) to retrieve it back.
func contextWithRequestTags(ctx context.Context, reqTags map[string]interface{}) context.Context {
return context.WithValue(ctx, requestTagsContextKey{}, reqTags)
}

// requestTagsFromContext retrieves the request tags map stored in the context
// via contextWithRequestTags.
func requestTagsFromContext(ctx context.Context) map[string]interface{} {
r := ctx.Value(requestTagsContextKey{})
if r == nil {
return nil
}
return r.(map[string]interface{})
}

0 comments on commit 065c944

Please sign in to comment.