Skip to content

Commit

Permalink
ccl/sqlproxyccl: rate limit throttler errors
Browse files Browse the repository at this point in the history
Previously, each request refused by the throttler would result in a
"throttler refused connection" message being logged, generating a log entry
for every rejected request.

The throttler service is responsible for rate limiting invalid login attempts
from IP addresses, and in practice, it can generate a high volume of such
traffic. To address this, errors are now rate limited in the logs to occur
once every 5 minutes per (IP, tenant) pair, ensuring that only one log entry
is generated within that time frame.

This change is internal, so no release note is required.

Epic: none

Release note: None
  • Loading branch information
jaylim-crl committed Nov 8, 2024
1 parent 4d5e31d commit 981066a
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
10 changes: 8 additions & 2 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,17 @@ func (handler *proxyHandler) handle(
defer removeListener()

throttleTags := throttler.ConnectionTags{IP: ipAddr, TenantID: tenID.String()}
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
throttleTime, err := handler.throttleService.LoginCheck(ctx, throttleTags)
if err != nil {
clientErr := authThrottledError
updateMetricsAndSendErrToClient(clientErr, fe.Conn, handler.metrics)
return errors.Wrap(err, "throttler refused connection")
// The throttle service is used to rate limit invalid login attempts
// from IP addresses, and it is commonly prone to generating excessive
// traffic in practice. Due to that, we'll return a nil here to prevent
// callers from logging this request. However, LoginCheck itself
// periodically logs an error when such requests are rate limited, so
// we won't miss any signals by doing this.
return nil
}

connector := &connector{
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/sqlproxyccl/throttler/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func NewLocalService(opts ...LocalOption) Service {
return s
}

var _ Service = (*localService)(nil)

func (s *localService) lockedGetThrottle(connection ConnectionTags) *throttle {
l, ok := s.mu.throttleCache.Get(connection)
if ok && l != nil {
Expand All @@ -86,18 +88,26 @@ func (s *localService) lockedInsertThrottle(connection ConnectionTags) *throttle
return l
}

func (s *localService) LoginCheck(connection ConnectionTags) (time.Time, error) {
// LoginCheck implements the Service interface.
func (s *localService) LoginCheck(
ctx context.Context, connection ConnectionTags,
) (time.Time, error) {
s.mu.Lock()
defer s.mu.Unlock()

now := s.clock()
throttle := s.lockedGetThrottle(connection)
if throttle != nil && throttle.isThrottled(now) {
if throttle.everyLog.ShouldLog() {
// ctx should include logtags about the connection.
log.Error(ctx, "throttler refused connection due to too many failed authentication attempts")
}
return now, errRequestDenied
}
return now, nil
}

// ReportAttempt implements the Service interface.
func (s *localService) ReportAttempt(
ctx context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus,
) error {
Expand Down
19 changes: 12 additions & 7 deletions pkg/ccl/sqlproxyccl/throttler/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type testLocalService struct {
clock fakeClock
}

var _ Service = (*testLocalService)(nil)

func newTestLocalService(opts ...LocalOption) *testLocalService {
s := &testLocalService{
localService: NewLocalService(opts...).(*localService),
Expand All @@ -49,16 +51,17 @@ func countGuesses(
step time.Duration,
period time.Duration,
) int {
ctx := context.Background()
count := 0
for i := 0; step*time.Duration(i) < period; i++ {
throttle.clock.advance(step)

throttleTime, err := throttle.LoginCheck(connection)
throttleTime, err := throttle.LoginCheck(ctx, connection)
if err != nil {
continue
}

err = throttle.ReportAttempt(context.Background(), connection, throttleTime, AttemptInvalidCredentials)
err = throttle.ReportAttempt(ctx, connection, throttleTime, AttemptInvalidCredentials)
require.NoError(t, err, "ReportAttempt should only return errors in the case of racing requests")

count++
Expand Down Expand Up @@ -95,13 +98,14 @@ func TestReportSuccessDisablesLimiter(t *testing.T) {
defer leaktest.AfterTest(t)()
testutilsccl.ServerlessOnly(t)

ctx := context.Background()
throttle := newTestLocalService()
tenant1 := ConnectionTags{IP: "1.1.1.1", TenantID: "1"}
tenant2 := ConnectionTags{IP: "1.1.1.1", TenantID: "2"}

throttleTime, err := throttle.LoginCheck(tenant1)
throttleTime, err := throttle.LoginCheck(ctx, tenant1)
require.NoError(t, err)
require.NoError(t, throttle.ReportAttempt(context.Background(), tenant1, throttleTime, AttemptOK))
require.NoError(t, throttle.ReportAttempt(ctx, tenant1, throttleTime, AttemptOK))

require.Equal(t,
int(time.Hour/time.Second),
Expand All @@ -120,19 +124,20 @@ func TestRacingRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
testutilsccl.ServerlessOnly(t)

ctx := context.Background()
throttle := newTestLocalService()
connection := ConnectionTags{IP: "1.1.1.1", TenantID: "1"}

throttleTime, err := throttle.LoginCheck(connection)
throttleTime, err := throttle.LoginCheck(ctx, connection)
require.NoError(t, err)

require.NoError(t, throttle.ReportAttempt(context.Background(), connection, throttleTime, AttemptInvalidCredentials))
require.NoError(t, throttle.ReportAttempt(ctx, connection, throttleTime, AttemptInvalidCredentials))

l := throttle.lockedGetThrottle(connection)
nextTime := l.nextTime

for _, status := range []AttemptStatus{AttemptOK, AttemptInvalidCredentials} {
require.Error(t, throttle.ReportAttempt(context.Background(), connection, throttleTime, status))
require.Error(t, throttle.ReportAttempt(ctx, connection, throttleTime, status))

// Verify the throttled report has no affect on limiter state.
l := throttle.lockedGetThrottle(connection)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/throttler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
type Service interface {
// LoginCheck determines whether a login request should be allowed to
// proceed. It rate limits login attempts from IP addresses.
LoginCheck(connection ConnectionTags) (time.Time, error)
LoginCheck(ctx context.Context, connection ConnectionTags) (time.Time, error)

// Report an authentication attempt. The throttleTime is used to
// retroactively throttle the request if a racing request triggered the
Expand All @@ -46,5 +46,5 @@ type Service interface {
// error instead of authentication success/failure. This limits the
// information a malicious user gets from using racing requests to guess
// multiple passwords in one throttle window.
ReportAttempt(context context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus) error
ReportAttempt(ctx context.Context, connection ConnectionTags, throttleTime time.Time, status AttemptStatus) error
}
14 changes: 12 additions & 2 deletions pkg/ccl/sqlproxyccl/throttler/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

package throttler

import "time"
import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
)

const (
// throttleDisabled is a sentinal value used to disable the throttle.
// throttleDisabled is a sentinel value used to disable the throttle.
throttleDisabled = time.Duration(0)
// throttleLogErrorDuration indicates how frequent the throttle error should
// be logged for a given throttle instance.
throttleLogErrorDuration = 5 * time.Minute
)

type throttle struct {
Expand All @@ -18,12 +25,15 @@ type throttle struct {
// The amount of backoff to introduce the next time the throttle
// is triggered. Setting nextBackoff to zero disables the throttle.
nextBackoff time.Duration
// everyLog controls how frequent the throttle error should be logged.
everyLog log.EveryN
}

func newThrottle(initialBackoff time.Duration) *throttle {
return &throttle{
nextTime: time.Time{},
nextBackoff: initialBackoff,
everyLog: log.Every(throttleLogErrorDuration),
}
}

Expand Down

0 comments on commit 981066a

Please sign in to comment.