Skip to content

Commit

Permalink
Merge #56192
Browse files Browse the repository at this point in the history
56192: [CC-3025] sqlproxy: rate limit proxy connections by IP r=spaskob a=spaskob

Add a new admitter package which performs basic connection admission
control to rate limit connection attempts. Connection attempts are
rate limited based on source IP. Admission control is currently
purely a local decision, but the package this interface defines could
easily become the client-side for connection to a centralized service.

Release note: none.

Co-authored-by: Spas Bojanov <[email protected]>
  • Loading branch information
craig[bot] and Spas Bojanov committed Nov 13, 2020
2 parents 06a70e0 + 9349bd4 commit 5b69ef1
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 70 deletions.
14 changes: 10 additions & 4 deletions pkg/ccl/cliccl/mtproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,22 @@ Uuwb2FVdh76ZK0AVd3Jh3KJs4+hr2u9syHaa7UPKXTcZsFWlGwZuu6X5A+0SO0S2
IncomingTLSConfig: &tls.Config{
Certificates: []tls.Certificate{cer},
},
BackendFromParams: func(params map[string]string) (addr string, conf *tls.Config, clientErr error) {
BackendConfigFromParams: func(
params map[string]string, ipAddress string,
) (config *sqlproxyccl.BackendConfig, clientErr error) {
const magic = "prancing-pony"
cfg := &sqlproxyccl.BackendConfig{
OutgoingAddress: sqlProxyTargetAddr,
TLSConf: outgoingConf,
}
if strings.HasPrefix(params["database"], magic+".") {
params["database"] = params["database"][len(magic)+1:]
return sqlProxyTargetAddr, outgoingConf, nil
return cfg, nil
}
if params["options"] == "--cluster="+magic {
return sqlProxyTargetAddr, outgoingConf, nil
return cfg, nil
}
return "", nil, errors.Errorf("client failed to pass '%s' via database or options", magic)
return nil, errors.Errorf("client failed to pass '%s' via database or options", magic)
},
})

Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/sqlproxyccl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ const (
// CodeClientDisconnected indicates that the client disconnected unexpectedly
// (with a connection error) while in a session with backend SQL server.
CodeClientDisconnected

// CodeProxyRefusedConnection indicates that the proxy refused the connection
// request due to high load or too many connection attempts.
CodeProxyRefusedConnection
)

type codeError struct {
Expand All @@ -69,7 +73,8 @@ func (e *codeError) Error() string {
return fmt.Sprintf("%s: %s", e.code, e.err)
}

func newErrorf(code ErrorCode, format string, args ...interface{}) error {
// NewErrorf returns a new codeError out of the supplied args.
func NewErrorf(code ErrorCode, format string, args ...interface{}) error {
return &codeError{
code: code,
err: errors.Errorf(format, args...),
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/sqlproxyccl/errorcode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Metrics struct {
ClientDisconnectCount *metric.Counter
CurConnCount *metric.Gauge
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
}

// MetricStruct implements the metrics.Struct interface.
Expand Down Expand Up @@ -56,6 +57,12 @@ var (
Measurement: "Disconnects",
Unit: metric.Unit_COUNT,
}
metaRefusedConnCount = metric.Metadata{
Name: "proxy.err.refused_conn",
Help: "Number of refused connections initiated by a given IP",
Measurement: "Refused",
Unit: metric.Unit_COUNT,
}
)

// MakeProxyMetrics instantiates the metrics holder for proxy monitoring.
Expand All @@ -66,5 +73,6 @@ func MakeProxyMetrics() Metrics {
ClientDisconnectCount: metric.NewCounter(metaClientDisconnectCount),
CurConnCount: metric.NewGauge(metaCurConnCount),
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
}
}
99 changes: 69 additions & 30 deletions pkg/ccl/sqlproxyccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"
"net"

"github.com/cockroachdb/errors"
"github.com/jackc/pgproto3/v2"
)

Expand All @@ -22,17 +23,30 @@ const pgAcceptSSLRequest = 'S'
// See https://www.postgresql.org/docs/9.1/protocol-message-formats.html.
var pgSSLRequest = []int32{8, 80877103}

// BackendConfig contains the configuration of a backend connection that is
// being proxied.
type BackendConfig struct {
// The address to which the connection is forwarded.
OutgoingAddress string
// TLS settings to use when connecting to OutgoingAddress.
TLSConf *tls.Config
// Called after successfully connecting to OutgoingAddr.
OnConnectionSuccess func()
}

// Options are the options to the Proxy method.
type Options struct {
IncomingTLSConfig *tls.Config // config used for client -> proxy connection

// TODO(tbg): this is unimplemented and exists only to check which clients
// allow use of SNI. Should always return ("", nil).
BackendFromSNI func(serverName string) (addr string, conf *tls.Config, clientErr error)
// BackendFromParams returns the address and TLS config to use for
// the proxy -> backend connection. The returned config must have
// an appropriate ServerName for the remote backend.
BackendFromParams func(map[string]string) (addr string, conf *tls.Config, clientErr error)
BackendConfigFromSNI func(serverName string) (config *BackendConfig, clientErr error)
// BackendFromParams returns the config to use for the proxy -> backend
// connection. The TLS config is in it and it must have an appropriate
// ServerName for the remote backend.
BackendConfigFromParams func(
params map[string]string, ipAddress string,
) (config *BackendConfig, clientErr error)

// If set, consulted to modify the parameters set by the frontend before
// forwarding them to the backend during startup.
Expand Down Expand Up @@ -60,7 +74,7 @@ func (s *Server) Proxy(conn net.Conn) error {
{
m, err := pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn).ReceiveStartupMessage()
if err != nil {
return newErrorf(CodeClientReadFailed, "while receiving startup message")
return NewErrorf(CodeClientReadFailed, "while receiving startup message")
}
switch m.(type) {
case *pgproto3.SSLRequest:
Expand All @@ -72,12 +86,12 @@ func (s *Server) Proxy(conn net.Conn) error {
default:
code := CodeUnexpectedInsecureStartupMessage
sendErrToClient(conn, code, "server requires encryption")
return newErrorf(code, "unsupported startup message: %T", m)
return NewErrorf(code, "unsupported startup message: %T", m)
}

_, err = conn.Write([]byte{pgAcceptSSLRequest})
if err != nil {
return newErrorf(CodeClientWriteFailed, "acking SSLRequest: %v", err)
return NewErrorf(CodeClientWriteFailed, "acking SSLRequest: %v", err)
}

cfg := s.opts.IncomingTLSConfig.Clone()
Expand All @@ -86,63 +100,83 @@ func (s *Server) Proxy(conn net.Conn) error {
sniServerName = h.ServerName
return nil, nil
}
if s.opts.BackendFromSNI != nil {
addr, _, clientErr := s.opts.BackendFromSNI(sniServerName)
if s.opts.BackendConfigFromSNI != nil {
cfg, clientErr := s.opts.BackendConfigFromSNI(sniServerName)
if clientErr != nil {
code := CodeSNIRoutingFailed
sendErrToClient(conn, code, clientErr.Error()) // won't actually be shown by most clients
return newErrorf(code, "rejected by OutgoingAddrFromSNI")
return NewErrorf(code, "rejected by OutgoingAddrFromSNI")
}
if addr != "" {
return newErrorf(CodeSNIRoutingFailed, "BackendFromSNI is unimplemented")
if cfg.OutgoingAddress != "" {
return NewErrorf(CodeSNIRoutingFailed, "BackendConfigFromSNI is unimplemented")
}
}
conn = tls.Server(conn, cfg)
}

m, err := pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn).ReceiveStartupMessage()
if err != nil {
return newErrorf(CodeClientReadFailed, "receiving post-TLS startup message: %v", err)
return NewErrorf(CodeClientReadFailed, "receiving post-TLS startup message: %v", err)
}
msg, ok := m.(*pgproto3.StartupMessage)
if !ok {
return newErrorf(CodeUnexpectedStartupMessage, "unsupported post-TLS startup message: %T", m)
return NewErrorf(CodeUnexpectedStartupMessage, "unsupported post-TLS startup message: %T", m)
}

outgoingAddr, outgoingTLS, clientErr := s.opts.BackendFromParams(msg.Parameters)
if clientErr != nil {
s.metrics.RoutingErrCount.Inc(1)
code := CodeParamsRoutingFailed
sendErrToClient(conn, code, clientErr.Error())
return newErrorf(code, "rejected by OutgoingAddrFromParams: %v", clientErr)
var backendConfig *BackendConfig
{
ip, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return NewErrorf(
CodeParamsRoutingFailed, "could not parse address %s: %v",
conn.RemoteAddr().String(), err)
}
var clientErr error
backendConfig, clientErr = s.opts.BackendConfigFromParams(msg.Parameters, ip)
if clientErr != nil {
var codeErr *codeError
if !errors.As(clientErr, &codeErr) {
codeErr = &codeError{
code: CodeParamsRoutingFailed,
err: errors.Errorf("rejected by BackendConfigFromParams: %v", clientErr),
}
}
if codeErr.code == CodeProxyRefusedConnection {
s.metrics.RefusedConnCount.Inc(1)
} else {
s.metrics.RoutingErrCount.Inc(1)
}
sendErrToClient(conn, codeErr.code, clientErr.Error())
return codeErr
}
}

crdbConn, err := net.Dial("tcp", outgoingAddr)
crdbConn, err := net.Dial("tcp", backendConfig.OutgoingAddress)
if err != nil {
s.metrics.BackendDownCount.Inc(1)
code := CodeBackendDown
sendErrToClient(conn, code, "unable to reach backend SQL server")
return newErrorf(code, "dialing backend server: %v", err)
return NewErrorf(code, "dialing backend server: %v", err)
}

// Send SSLRequest.
if err := binary.Write(crdbConn, binary.BigEndian, pgSSLRequest); err != nil {
s.metrics.BackendDownCount.Inc(1)
return newErrorf(CodeBackendDown, "sending SSLRequest to target server: %v", err)
return NewErrorf(CodeBackendDown, "sending SSLRequest to target server: %v", err)
}

response := make([]byte, 1)
if _, err = io.ReadFull(crdbConn, response); err != nil {
s.metrics.BackendDownCount.Inc(1)
return newErrorf(CodeBackendDown, "reading response to SSLRequest")
return NewErrorf(CodeBackendDown, "reading response to SSLRequest")
}

if response[0] != pgAcceptSSLRequest {
s.metrics.BackendDownCount.Inc(1)
return newErrorf(CodeBackendRefusedTLS, "target server refused TLS connection")
return NewErrorf(CodeBackendRefusedTLS, "target server refused TLS connection")
}

outCfg := outgoingTLS.Clone()
outCfg := backendConfig.TLSConf.Clone()
crdbConn = tls.Client(crdbConn, outCfg)

if s.opts.ModifyRequestParams != nil {
Expand All @@ -151,7 +185,12 @@ func (s *Server) Proxy(conn net.Conn) error {

if _, err := crdbConn.Write(msg.Encode(nil)); err != nil {
s.metrics.BackendDownCount.Inc(1)
return newErrorf(CodeBackendDown, "relaying StartupMessage to target server %v: %v", outgoingAddr, err)
return NewErrorf(CodeBackendDown, "relaying StartupMessage to target server %v: %v",
backendConfig.OutgoingAddress, err)
}

if backendConfig.OnConnectionSuccess != nil {
backendConfig.OnConnectionSuccess()
}

// These channels are buffered because we'll only consume one of them.
Expand All @@ -177,14 +216,14 @@ func (s *Server) Proxy(conn net.Conn) error {
case err := <-errIncoming:
if err != nil {
s.metrics.BackendDisconnectCount.Inc(1)
return newErrorf(CodeBackendDisconnected, "copying from target server to client: %s", err)
return NewErrorf(CodeBackendDisconnected, "copying from target server to client: %s", err)
}
return nil
case err := <-errOutgoing:
// The incoming connection got closed.
if err != nil {
s.metrics.ClientDisconnectCount.Inc(1)
return newErrorf(CodeClientDisconnected, "copying from target server to client: %v", err)
return NewErrorf(CodeClientDisconnected, "copying from target server to client: %v", err)
}
return nil
}
Expand Down
Loading

0 comments on commit 5b69ef1

Please sign in to comment.