diff --git a/pkg/ccl/sqlproxyccl/admitter/local.go b/pkg/ccl/sqlproxyccl/admitter/local.go new file mode 100644 index 000000000000..1b6c613b6371 --- /dev/null +++ b/pkg/ccl/sqlproxyccl/admitter/local.go @@ -0,0 +1,149 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package admitter + +import ( + "errors" + "math/rand" + "sync" + "time" +) + +const ( + // The maximum size of localService's address map. + maxMapSize = 1e6 // 1 million +) + +var errRequestDenied = errors.New("request denied") + +type timeNow func() time.Time + +type limiter struct { + // The next time an operation on this limiter can proceed. + nextTime time.Time + // The number of operation attempts that have been performed. On success, the + // limiter will be removed. + attempts int + // The index of the limiter in the addresses array. + index int +} + +// localService is an admitter Service that manages state purely in local +// memory. Internally, it maintains a map from IP address to rate limiting info +// for that address. In order to put a cap on memory usage, the map is capped +// at a maximum size, at which point a random IP address will be evicted. +type localService struct { + clock timeNow + baseDelay time.Duration + maxDelay time.Duration + maxMapSize int + + mu struct { + sync.Mutex + // Map from IP address to limiter. + limiters map[string]*limiter + // Array of addresses, used for randomly evicting an address when the max + // entries is reached. + addrs []string + } +} + +// LocalOption allows configuration of a local admission service. +type LocalOption func(s *localService) + +// WithBaseDelay specifies the base delay for rate limiting repeated accesses. +func WithBaseDelay(d time.Duration) LocalOption { + return func(s *localService) { + s.baseDelay = d + } +} + +// NewLocalService returns an admitter Service that manages state purely in +// local memory. +func NewLocalService(opts ...LocalOption) Service { + s := &localService{ + clock: time.Now, + baseDelay: 2 * time.Second, + maxDelay: 60 * 60 * time.Second, + maxMapSize: maxMapSize, + } + s.mu.limiters = make(map[string]*limiter) + + for _, opt := range opts { + opt(s) + } + return s +} + +func (s *localService) AllowRequest(ipAddress string, now time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + + l := s.mu.limiters[ipAddress] + if l == nil { + l = s.addLocked(ipAddress) + } + if now.Before(l.nextTime) { + return errRequestDenied + } + s.nextLimitLocked(l) + return nil +} + +func (s *localService) RequestSuccess(ipAddress string) { + s.mu.Lock() + defer s.mu.Unlock() + s.evictLocked(ipAddress) +} + +func (s *localService) addLocked(addr string) *limiter { + if len(s.mu.addrs) >= s.maxMapSize { + addr := s.mu.addrs[rand.Intn(len(s.mu.limiters))] + s.evictLocked(addr) + } + + l := &limiter{ + index: len(s.mu.limiters), + } + s.mu.limiters[addr] = l + s.mu.addrs = append(s.mu.addrs, addr) + return l +} + +func (s *localService) evictLocked(addr string) { + l := s.mu.limiters[addr] + if l == nil { + return + } + + // Swap the address we're evicting to the end of the address array. + n := len(s.mu.addrs) - 1 + s.mu.addrs[l.index], s.mu.addrs[n] = s.mu.addrs[n], s.mu.addrs[l.index] + // Fix-up the index of the limiter we're keeping. + s.mu.limiters[s.mu.addrs[l.index]].index = l.index + // Trim the evicted address from the address array. + s.mu.addrs = s.mu.addrs[:n] + // Delete the address from the limiters map. + delete(s.mu.limiters, addr) +} + +func (s *localService) nextLimitLocked(l *limiter) { + // This calculation implements a simple capped exponential backoff. No + // randomization is done. We could use github.com/cenkalti/backoff, but this + // gives us a more control over the precise calculation and is about half the + // size in terms of memory usage. The latter part may be important for IP + // address based admission control. + delay := s.baseDelay * (1 << l.attempts) + if delay >= s.maxDelay { + delay = s.maxDelay + } + l.attempts++ + + l.nextTime = s.clock().Add(delay) +} diff --git a/pkg/ccl/sqlproxyccl/admitter/local_test.go b/pkg/ccl/sqlproxyccl/admitter/local_test.go new file mode 100644 index 000000000000..009bbbc4f1fb --- /dev/null +++ b/pkg/ccl/sqlproxyccl/admitter/local_test.go @@ -0,0 +1,91 @@ +package admitter + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type fakeClock struct { + next time.Time +} + +func (f *fakeClock) Now() time.Time { + return f.next +} + +func (f *fakeClock) advance(d time.Duration) { + f.next = f.next.Add(d) +} + +type testLocalService struct { + *localService + clock fakeClock +} + +func newTestLocalService() *testLocalService { + s := &testLocalService{ + localService: NewLocalService().(*localService), + } + s.localService.clock = s.clock.Now + return s +} + +func (s *localService) checkInvariants() error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.mu.limiters) != len(s.mu.addrs) { + return fmt.Errorf("len(limiters) [%d] != len(addrs) [%d]", len(s.mu.limiters), len(s.mu.addrs)) + } + + for i, addr := range s.mu.addrs { + l := s.mu.limiters[addr] + if l.index != i { + return fmt.Errorf("limiters[addrs[%d]].index != %d (addr=%s index=%d)", i, i, addr, l.index) + } + } + return nil +} + +var expectedBackOff = []int{2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 3600, 3600, 3600} + +func TestLocalService_BackOff(t *testing.T) { + s := newTestLocalService() + + const ipAddress = "127.0.0.1" + + verifyBackOff := func() { + require.NoError(t, s.AllowRequest(ipAddress, s.clock.Now())) + + for _, delay := range expectedBackOff { + require.EqualError(t, s.AllowRequest(ipAddress, s.clock.Now()), errRequestDenied.Error()) + + s.clock.advance(time.Duration(delay)*time.Second - time.Nanosecond) + require.EqualError(t, s.AllowRequest(ipAddress, s.clock.Now()), errRequestDenied.Error()) + + s.clock.advance(time.Nanosecond) + require.NoError(t, s.AllowRequest(ipAddress, s.clock.Now())) + } + } + + verifyBackOff() + + // If a login is successful, the exponential backoff is reset. + s.RequestSuccess(ipAddress) + verifyBackOff() +} + +func TestLocalService_Eviction(t *testing.T) { + s := newTestLocalService() + s.maxMapSize = 10 + + for i := 0; i < 10000; i++ { + ipAddress := fmt.Sprintf("%d", i) + _ = s.AllowRequest(ipAddress, s.clock.Now()) + require.Less(t, len(s.mu.limiters), 11) + require.NoError(t, s.checkInvariants()) + } +} diff --git a/pkg/ccl/sqlproxyccl/admitter/service.go b/pkg/ccl/sqlproxyccl/admitter/service.go new file mode 100644 index 000000000000..5473b816e366 --- /dev/null +++ b/pkg/ccl/sqlproxyccl/admitter/service.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package admitter + +import "time" + +// Service provides the interface for performing admission checks before +// allowing requests into sqlproxy. +type Service interface { + // AllowRequest determines whether a request should be allowed to proceed. It + // rate limits requests from IP addresses regardless of tenant id. + AllowRequest(ipAddress string, now time.Time) error + + // RequestSuccess records the result of a successful request. + RequestSuccess(ipAddress string) + + // KnownClient checks if this client has connected successfully before. + KnownClient(ipAddress string, tenID uint64) bool +}