Skip to content

Commit

Permalink
sqlproxy: add basic login rate limiting using admitter
Browse files Browse the repository at this point in the history
For simplicity we only limit on IP address. However when a connection
to a given tenant from an IP succeeds we allow all future connections
from this IP to this tenant. The successes are cached in memory hence
they will not survive across proxy restarts.

Release note: none.
  • Loading branch information
Spas Bojanov committed Nov 4, 2020
1 parent 0f14118 commit df9b92f
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 4 deletions.
20 changes: 19 additions & 1 deletion pkg/ccl/sqlproxyccl/admitter/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type localService struct {
sync.Mutex
// Map from IP address to limiter.
limiters map[string]*limiter
// Map from IP address to known tenant ids for this IP.
knownTenants map[string]map[uint64]bool
// Array of addresses, used for randomly evicting an address when the max
// entries is reached.
addrs []string
Expand All @@ -74,6 +76,7 @@ func NewLocalService(opts ...LocalOption) Service {
maxMapSize: maxMapSize,
}
s.mu.limiters = make(map[string]*limiter)
s.mu.knownTenants = make(map[string]map[uint64]bool)

for _, opt := range opts {
opt(s)
Expand All @@ -96,10 +99,25 @@ func (s *localService) AllowRequest(ipAddress string, now time.Time) error {
return nil
}

func (s *localService) RequestSuccess(ipAddress string) {
func (s *localService) RequestSuccess(ipAddress string, tenID uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.evictLocked(ipAddress)
if _, ok := s.mu.knownTenants[ipAddress]; !ok {
s.mu.knownTenants[ipAddress] = map[uint64]bool{tenID: true}
} else {
s.mu.knownTenants[ipAddress][tenID] = true
}
}

func (s *localService) KnownClient(ipAddress string, tenID uint64) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.mu.knownTenants[ipAddress]
if ok {
_, ok = s.mu.knownTenants[ipAddress][tenID]
}
return ok
}

func (s *localService) addLocked(addr string) *limiter {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/admitter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Service interface {
AllowRequest(ipAddress string, now time.Time) error

// RequestSuccess records the result of a successful request.
RequestSuccess(ipAddress string)
RequestSuccess(ipAddress string, tenID uint64)

// KnownClient checks if this client has connected successfully before.
KnownClient(ipAddress string, tenID uint64) bool
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ const (
// CodeClientDisconnected indicates that the client disconnected unexpectedly
// (with a connection error) while in a session with backend SQL server.
CodeClientDisconnected

// CodeProxyRefusedConnection indicates that the proxy refused the connection
// request due to high load or too many connection attempts.
CodeProxyRefusedConnection
)

type codeError struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/sqlproxyccl/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Metrics struct {
ClientDisconnectCount *metric.Counter
CurConnCount *metric.Gauge
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
}

// MetricStruct implements the metrics.Struct interface.
Expand Down Expand Up @@ -56,6 +57,12 @@ var (
Measurement: "Disconnects",
Unit: metric.Unit_COUNT,
}
metaRefusedConnCount = metric.Metadata{
Name: "proxy.err.refused_conn",
Help: "Number of refused connections initiated by a given IP",
Measurement: "Refused",
Unit: metric.Unit_COUNT,
}
)

// MakeProxyMetrics instantiates the metrics holder for proxy monitoring.
Expand All @@ -66,5 +73,6 @@ func MakeProxyMetrics() Metrics {
ClientDisconnectCount: metric.NewCounter(metaClientDisconnectCount),
CurConnCount: metric.NewGauge(metaCurConnCount),
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaBackendDisconnectCount),
}
}
32 changes: 32 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"io"
"net"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/admitter"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/jackc/pgproto3/v2"
)

Expand All @@ -31,10 +33,13 @@ type Options struct {
// allow use of SNI. Should always return ("", nil).
OutgoingAddrFromSNI func(serverName string) (addr string, clientErr error)
OutgoingAddrFromParams func(map[string]string) (addr string, clientErr error)
TenantFromParams func(map[string]string) (tenID uint64, clientErr error)

// If set, consulted to decorate an error message to be sent to the client.
// The error passed to this method will contain no internal information.
OnSendErrToClient func(code ErrorCode, msg string) string

admitter admitter.Service
}

// Proxy takes an incoming client connection and relays it to a backend SQL
Expand Down Expand Up @@ -111,6 +116,29 @@ func (s *Server) Proxy(conn net.Conn) error {
return newErrorf(code, "rejected by OutgoingAddrFromParams: %v", clientErr)
}

var ip string
var tenID uint64
if s.admitter != nil {
ip := conn.RemoteAddr().String()
tenID, clientErr = s.opts.TenantFromParams(msg.Parameters)
if clientErr != nil {
s.metrics.RoutingErrCount.Inc(1)
code := CodeParamsRoutingFailed
sendErrToClient(conn, code, clientErr.Error())
return newErrorf(code, "rejected by TenantFromParams: %v", clientErr)
}

// If a previous successful connection from this IP to the given client was
// made then admit immediately.
if !s.admitter.KnownClient(ip, tenID) {
// Otherwise rate limit the attempt from an unknown client.
if err := s.admitter.AllowRequest(ip, timeutil.Now()); err != nil {
s.metrics.RefusedConnCount.Inc(1)
return newErrorf(CodeProxyRefusedConnection, "too many connection attempts")
}
}
}

crdbConn, err := net.Dial("tcp", outgoingAddr)
if err != nil {
s.metrics.BackendDownCount.Inc(1)
Expand Down Expand Up @@ -145,6 +173,10 @@ func (s *Server) Proxy(conn net.Conn) error {
return newErrorf(CodeBackendDown, "relaying StartupMessage to target server %v: %v", outgoingAddr, err)
}

if s.admitter != nil {
s.admitter.RequestSuccess(ip, tenID)
}

// These channels are buffered because we'll only consume one of them.
errOutgoing := make(chan error, 1)
errIncoming := make(chan error, 1)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func TestFailedConnection(t *testing.T) {
ac := makeAssertCtx()
opts := Options{
OutgoingAddrFromParams: testingTenantIDFromDatabaseForAddr("undialable%$!@$", "29"),
TenantFromParams: func(mm map[string]string) (uint64, error) { return 29, nil },
OnSendErrToClient: ac.onSendErrToClient,
}
addr, done := setupTestProxyWithCerts(t, &opts)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/admitter"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -31,6 +32,7 @@ type Server struct {
mux *http.ServeMux
metrics *Metrics
metricsRegistry *metric.Registry
admitter admitter.Service

promMu syncutil.Mutex
prometheusExporter metric.PrometheusExporter
Expand All @@ -52,6 +54,7 @@ func NewServer(opts Options) *Server {
mux: mux,
metrics: &proxyMetrics,
metricsRegistry: registry,
admitter: admitter.NewLocalService(),
prometheusExporter: metric.MakePrometheusExporter(),
}

Expand Down

0 comments on commit df9b92f

Please sign in to comment.