Skip to content

Commit

Permalink
sqlproxy/admitter:
Browse files Browse the repository at this point in the history
Add a new admitter package which performs basic connection admission
control to rate limit connection attempts. Connection attempts are
rate limited based on source IP. Admission control is currently
purely a local decision, but the package this interface defines could
easily become the client-side for connection to a centralized service.

Note that this is a fork of the admitter package in the console repo.

Release note: none.
  • Loading branch information
Spas Bojanov committed Nov 4, 2020
1 parent 2546fd4 commit 0f14118
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 0 deletions.
149 changes: 149 additions & 0 deletions pkg/ccl/sqlproxyccl/admitter/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package admitter

import (
"errors"
"math/rand"
"sync"
"time"
)

const (
// The maximum size of localService's address map.
maxMapSize = 1e6 // 1 million
)

var errRequestDenied = errors.New("request denied")

type timeNow func() time.Time

type limiter struct {
// The next time an operation on this limiter can proceed.
nextTime time.Time
// The number of operation attempts that have been performed. On success, the
// limiter will be removed.
attempts int
// The index of the limiter in the addresses array.
index int
}

// localService is an admitter Service that manages state purely in local
// memory. Internally, it maintains a map from IP address to rate limiting info
// for that address. In order to put a cap on memory usage, the map is capped
// at a maximum size, at which point a random IP address will be evicted.
type localService struct {
clock timeNow
baseDelay time.Duration
maxDelay time.Duration
maxMapSize int

mu struct {
sync.Mutex
// Map from IP address to limiter.
limiters map[string]*limiter
// Array of addresses, used for randomly evicting an address when the max
// entries is reached.
addrs []string
}
}

// LocalOption allows configuration of a local admission service.
type LocalOption func(s *localService)

// WithBaseDelay specifies the base delay for rate limiting repeated accesses.
func WithBaseDelay(d time.Duration) LocalOption {
return func(s *localService) {
s.baseDelay = d
}
}

// NewLocalService returns an admitter Service that manages state purely in
// local memory.
func NewLocalService(opts ...LocalOption) Service {
s := &localService{
clock: time.Now,
baseDelay: 2 * time.Second,
maxDelay: 60 * 60 * time.Second,
maxMapSize: maxMapSize,
}
s.mu.limiters = make(map[string]*limiter)

for _, opt := range opts {
opt(s)
}
return s
}

func (s *localService) AllowRequest(ipAddress string, now time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()

l := s.mu.limiters[ipAddress]
if l == nil {
l = s.addLocked(ipAddress)
}
if now.Before(l.nextTime) {
return errRequestDenied
}
s.nextLimitLocked(l)
return nil
}

func (s *localService) RequestSuccess(ipAddress string) {
s.mu.Lock()
defer s.mu.Unlock()
s.evictLocked(ipAddress)
}

func (s *localService) addLocked(addr string) *limiter {
if len(s.mu.addrs) >= s.maxMapSize {
addr := s.mu.addrs[rand.Intn(len(s.mu.limiters))]
s.evictLocked(addr)
}

l := &limiter{
index: len(s.mu.limiters),
}
s.mu.limiters[addr] = l
s.mu.addrs = append(s.mu.addrs, addr)
return l
}

func (s *localService) evictLocked(addr string) {
l := s.mu.limiters[addr]
if l == nil {
return
}

// Swap the address we're evicting to the end of the address array.
n := len(s.mu.addrs) - 1
s.mu.addrs[l.index], s.mu.addrs[n] = s.mu.addrs[n], s.mu.addrs[l.index]
// Fix-up the index of the limiter we're keeping.
s.mu.limiters[s.mu.addrs[l.index]].index = l.index
// Trim the evicted address from the address array.
s.mu.addrs = s.mu.addrs[:n]
// Delete the address from the limiters map.
delete(s.mu.limiters, addr)
}

func (s *localService) nextLimitLocked(l *limiter) {
// This calculation implements a simple capped exponential backoff. No
// randomization is done. We could use github.com/cenkalti/backoff, but this
// gives us a more control over the precise calculation and is about half the
// size in terms of memory usage. The latter part may be important for IP
// address based admission control.
delay := s.baseDelay * (1 << l.attempts)
if delay >= s.maxDelay {
delay = s.maxDelay
}
l.attempts++

l.nextTime = s.clock().Add(delay)
}
91 changes: 91 additions & 0 deletions pkg/ccl/sqlproxyccl/admitter/local_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package admitter

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type fakeClock struct {
next time.Time
}

func (f *fakeClock) Now() time.Time {
return f.next
}

func (f *fakeClock) advance(d time.Duration) {
f.next = f.next.Add(d)
}

type testLocalService struct {
*localService
clock fakeClock
}

func newTestLocalService() *testLocalService {
s := &testLocalService{
localService: NewLocalService().(*localService),
}
s.localService.clock = s.clock.Now
return s
}

func (s *localService) checkInvariants() error {
s.mu.Lock()
defer s.mu.Unlock()

if len(s.mu.limiters) != len(s.mu.addrs) {
return fmt.Errorf("len(limiters) [%d] != len(addrs) [%d]", len(s.mu.limiters), len(s.mu.addrs))
}

for i, addr := range s.mu.addrs {
l := s.mu.limiters[addr]
if l.index != i {
return fmt.Errorf("limiters[addrs[%d]].index != %d (addr=%s index=%d)", i, i, addr, l.index)
}
}
return nil
}

var expectedBackOff = []int{2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 3600, 3600, 3600}

func TestLocalService_BackOff(t *testing.T) {
s := newTestLocalService()

const ipAddress = "127.0.0.1"

verifyBackOff := func() {
require.NoError(t, s.AllowRequest(ipAddress, s.clock.Now()))

for _, delay := range expectedBackOff {
require.EqualError(t, s.AllowRequest(ipAddress, s.clock.Now()), errRequestDenied.Error())

s.clock.advance(time.Duration(delay)*time.Second - time.Nanosecond)
require.EqualError(t, s.AllowRequest(ipAddress, s.clock.Now()), errRequestDenied.Error())

s.clock.advance(time.Nanosecond)
require.NoError(t, s.AllowRequest(ipAddress, s.clock.Now()))
}
}

verifyBackOff()

// If a login is successful, the exponential backoff is reset.
s.RequestSuccess(ipAddress)
verifyBackOff()
}

func TestLocalService_Eviction(t *testing.T) {
s := newTestLocalService()
s.maxMapSize = 10

for i := 0; i < 10000; i++ {
ipAddress := fmt.Sprintf("%d", i)
_ = s.AllowRequest(ipAddress, s.clock.Now())
require.Less(t, len(s.mu.limiters), 11)
require.NoError(t, s.checkInvariants())
}
}
25 changes: 25 additions & 0 deletions pkg/ccl/sqlproxyccl/admitter/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package admitter

import "time"

// Service provides the interface for performing admission checks before
// allowing requests into sqlproxy.
type Service interface {
// AllowRequest determines whether a request should be allowed to proceed. It
// rate limits requests from IP addresses regardless of tenant id.
AllowRequest(ipAddress string, now time.Time) error

// RequestSuccess records the result of a successful request.
RequestSuccess(ipAddress string)

// KnownClient checks if this client has connected successfully before.
KnownClient(ipAddress string, tenID uint64) bool
}

0 comments on commit 0f14118

Please sign in to comment.