Skip to content

Commit

Permalink
ccl/sqlproxyccl: add PROXY protocol support via CLI flag to sqlproxy
Browse files Browse the repository at this point in the history
This commits adds a new `require-proxy-protocol` flag to `mt start-proxy`, and
that changes the sqlproxy's behavior to support the PROXY protocol. When the
flag is set, the protocol will be enforced on the SQL listener, and supported
on a best-effort basis on the HTTP listener. If the PROXY protocol isn't used,
but is enforced, the connection will be rejected. The rationale behind doing
best-effort basis on the HTTP listener is that some healthcheck systems don't
support the protocol.

This work is needed for the AWS PrivateLink work in CockroachCloud, which
requires the use of the PROXY protocol.

Release note: None

Epic: none

Release justification: SQL Proxy change only. Changes are needed for the AWS
PrivateLink work in CockroachCloud.
  • Loading branch information
jaylim-crl committed Mar 30, 2023
1 parent 2414bba commit 374ed5e
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 6 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7117,10 +7117,10 @@ def go_deps():
name = "com_github_pires_go_proxyproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pires/go-proxyproto",
sha256 = "ad00aa7f321a1b560d2c6fb33f0bfaf2a0ce056a25c9a756cbd198b655fcbbf9",
strip_prefix = "github.com/pires/go-proxyproto@v0.0.0-20191211124218-517ecdf5bb2b",
sha256 = "5ba5921ebf2f5d1186268740ebf6e594e4512fcbb503f2974b1038781a5920f8",
strip_prefix = "github.com/pires/go-proxyproto@v0.7.0",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pires/go-proxyproto/com_github_pires_go_proxyproto-v0.0.0-20191211124218-517ecdf5bb2b.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pires/go-proxyproto/com_github_pires_go_proxyproto-v0.7.0.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pierrre/compare/com_github_pierrre_compare-v1.0.2.zip": "99af9543f52487c6e7015721def85aa2d9eb7661e37b151f1db91875dcda2ee7",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pierrre/geohash/com_github_pierrre_geohash-v1.0.0.zip": "8c94a7e1f93170b53cf6e9d615967c24ff5342d5182d510f4829b3f39e249b4d",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.4.zip": "df62e548162429501a88d936a3e8330f2379ddfcd4d23c22b78bc1b157e05b97",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pires/go-proxyproto/com_github_pires_go_proxyproto-v0.0.0-20191211124218-517ecdf5bb2b.zip": "ad00aa7f321a1b560d2c6fb33f0bfaf2a0ce056a25c9a756cbd198b655fcbbf9",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pires/go-proxyproto/com_github_pires_go_proxyproto-v0.7.0.zip": "5ba5921ebf2f5d1186268740ebf6e594e4512fcbb503f2974b1038781a5920f8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pkg/browser/com_github_pkg_browser-v0.0.0-20210115035449-ce105d075bb4.zip": "84db38d8db553ccc34c75f867396126eac07774b979c470f97a20854d3a3af6d",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pkg/diff/com_github_pkg_diff-v0.0.0-20210226163009-20ebb0f2a09e.zip": "f35b23fdd2b9522ddd46cc5c0161b4f0765c514475d5d4ca2a86aca31388c8bd",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/pkg/errors/com_github_pkg_errors-v0.9.1.zip": "d4c36b8bcd0616290a3913215e0f53b931bd6e00670596f2960df1b44af2bd07",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ require (
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36
github.com/pierrec/lz4/v4 v4.1.17
github.com/pierrre/geohash v1.0.0
github.com/pires/go-proxyproto v0.7.0
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4
github.com/pmezard/go-difflib v1.0.0
github.com/pressly/goose/v3 v3.5.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,8 @@ github.com/pierrre/geohash v1.0.0/go.mod h1:atytaeVa21hj5F6kMebHYPf8JbIrGxK2FSzN
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pires/go-proxyproto v0.0.0-20191211124218-517ecdf5bb2b/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs=
github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/cliccl/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func setProxyContextDefaults() {
proxyContext.PollConfigInterval = 30 * time.Second
proxyContext.ThrottleBaseDelay = time.Second
proxyContext.DisableConnectionRebalancing = false
proxyContext.RequireProxyProtocol = false
}

var testDirectorySvrContext struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/cliccl/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func init() {
cliflagcfg.DurationFlag(f, &proxyContext.PollConfigInterval, cliflags.PollConfigInterval)
cliflagcfg.DurationFlag(f, &proxyContext.ThrottleBaseDelay, cliflags.ThrottleBaseDelay)
cliflagcfg.BoolFlag(f, &proxyContext.DisableConnectionRebalancing, cliflags.DisableConnectionRebalancing)
cliflagcfg.BoolFlag(f, &proxyContext.RequireProxyProtocol, cliflags.RequireProxyProtocol)
}

// Multi-tenancy test directory command flags.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_pires_go_proxyproto//:go-proxyproto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials/insecure",
Expand Down Expand Up @@ -118,6 +119,7 @@ go_test(
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_pires_go_proxyproto//:go-proxyproto",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v3//:yaml_v3",
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/jackc/pgproto3/v2"
proxyproto "github.com/pires/go-proxyproto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand Down Expand Up @@ -108,6 +109,11 @@ type ProxyOptions struct {
ThrottleBaseDelay time.Duration
// DisableConnectionRebalancing disables connection rebalancing for tenants.
DisableConnectionRebalancing bool
// RequireProxyProtocol changes the server's behavior to support the PROXY
// protocol (SQL=required, HTTP=best-effort). With this set to true, the
// PROXY info from upstream will be trusted on both HTTP and SQL, if the
// headers are allowed.
RequireProxyProtocol bool

// testingKnobs are knobs used for testing.
testingKnobs struct {
Expand All @@ -123,6 +129,9 @@ type ProxyOptions struct {
// balancerOpts is used to customize the balancer created by the proxy.
balancerOpts []balancer.Option

// validateProxyHeader is used to validate the PROXY header.
validateProxyHeader proxyproto.Validator

httpCancelErrHandler func(err error)
}
}
Expand Down
163 changes: 163 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/jackc/pgconn"
pgproto3 "github.com/jackc/pgproto3/v2"
pgx "github.com/jackc/pgx/v4"
proxyproto "github.com/pires/go-proxyproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand All @@ -72,6 +73,138 @@ const backendError = "Backend error!"
// the test directory server.
const notFoundTenantID = 99

func TestProxyProtocol(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
te := newTester()
defer te.Close()

sql, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Insecure: false,
// Need to disable the test tenant here because it appears as though
// we're not able to establish the necessary connections from within
// it. More investigation required (tracked with #76378).
DisableDefaultTestTenant: true,
})
sql.(*server.TestServer).PGPreServer().TestingSetTrustClientProvidedRemoteAddr(true)
pgs := sql.(*server.TestServer).PGServer().(*pgwire.Server)
pgs.TestingEnableAuthLogging()
defer sql.Stopper().Stop(ctx)

// Create a default user.
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE USER bob WITH PASSWORD 'builder'`)

var validateFn func(h *proxyproto.Header) error
withProxyProtocol := func(p bool) (server *Server, addr, httpAddr string) {
options := &ProxyOptions{
RoutingRule: sql.ServingSQLAddr(),
SkipVerify: true,
RequireProxyProtocol: p,
}
options.testingKnobs.validateProxyHeader = func(h *proxyproto.Header) error {
return validateFn(h)
}
return newSecureProxyServer(ctx, t, sql.Stopper(), options)
}

timeout := 3 * time.Second
proxyDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := (&net.Dialer{Timeout: timeout}).Dial(network, addr)
if err != nil {
return nil, err
}
header := &proxyproto.Header{
Version: 2,
Command: proxyproto.PROXY,
TransportProtocol: proxyproto.TCPv4,
SourceAddr: &net.TCPAddr{
// Use a dummy address so we can check on that.
IP: net.ParseIP("10.20.30.40"),
Port: 4242,
},
DestinationAddr: conn.RemoteAddr(),
}
if err := conn.SetWriteDeadline(timeutil.Now().Add(timeout)); err != nil {
return nil, err
}
_, err = header.WriteTo(conn)
if err != nil {
return nil, err
}
return conn, nil
}

makeHttpReq := func(t *testing.T, client *http.Client, addr string, success bool) {
resp, err := client.Get(fmt.Sprintf("http://%s/_status/healthz/", addr))
require.NoError(t, err)
defer resp.Body.Close()
if success {
require.Equal(t, "200 OK", resp.Status)
} else {
require.Equal(t, "400 Bad Request", resp.Status)
}
}

t.Run("allow=true", func(t *testing.T) {
s, sqlAddr, httpAddr := withProxyProtocol(true)

defer testutils.TestingHook(&validateFn, func(h *proxyproto.Header) error {
if h.SourceAddr.String() != "10.20.30.40:4242" {
return errors.Newf("got source addr %s, expected 10.20.30.40:4242", h.SourceAddr)
}
return nil
})()

// Test SQL. Only request with PROXY should go through.
url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr)
te.TestConnectWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
func(conn *pgx.Conn) {
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))
},
)
_ = te.TestConnectErr(ctx, t, url, codeClientReadFailed, "tls error")

// Test HTTP. Should support with or without PROXY.
client := http.Client{Timeout: timeout}
makeHttpReq(t, &client, httpAddr, true)
client.Transport = &http.Transport{DialContext: proxyDialer}
makeHttpReq(t, &client, httpAddr, true)
})

t.Run("allow=false", func(t *testing.T) {
s, sqlAddr, httpAddr := withProxyProtocol(false)

// Test SQL. Only request without PROXY should go through.
url := fmt.Sprintf("postgres://bob:builder@%s/tenant-cluster-42.defaultdb?sslmode=require", sqlAddr)
te.TestConnect(ctx, t, url, func(conn *pgx.Conn) {
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))
})
_ = te.TestConnectErrWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
codeClientReadFailed,
"tls error",
)

// Test HTTP.
client := http.Client{Timeout: timeout}
makeHttpReq(t, &client, httpAddr, true)
client.Transport = &http.Transport{DialContext: proxyDialer}
makeHttpReq(t, &client, httpAddr, false)
})
}

func TestLongDBName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -2293,6 +2426,16 @@ func (te *tester) setErrToClient(codeErr error) {
// established connection. Use TestConnectErr if connection establishment isn't
// expected to succeed.
func (te *tester) TestConnect(ctx context.Context, t *testing.T, url string, fn func(*pgx.Conn)) {
te.TestConnectWithPGConfig(ctx, t, url, nil, fn)
}

// TestConnectWithPGConfig connects to the given URL and invokes the given
// callbacks with the established connection. Unlike TestConnect, this takes in
// a custom callback function that allows callers to modify the PG config before
// making the connection.
func (te *tester) TestConnectWithPGConfig(
ctx context.Context, t *testing.T, url string, configFn func(*pgx.ConnConfig), fn func(*pgx.Conn),
) {
t.Helper()
te.setAuthenticated(false)
te.setErrToClient(nil)
Expand All @@ -2302,6 +2445,9 @@ func (te *tester) TestConnect(ctx context.Context, t *testing.T, url string, fn
connConfig.TLSConfig.ServerName = connConfig.Host
connConfig.Host = "127.0.0.1"
}
if configFn != nil {
configFn(connConfig)
}
conn, err := pgx.ConnectConfig(ctx, connConfig)
require.NoError(t, err)
fn(conn)
Expand All @@ -2314,6 +2460,20 @@ func (te *tester) TestConnect(ctx context.Context, t *testing.T, url string, fn
// an error to occur and validates the error matches the provided information.
func (te *tester) TestConnectErr(
ctx context.Context, t *testing.T, url string, expCode errorCode, expErr string,
) error {
return te.TestConnectErrWithPGConfig(ctx, t, url, nil, expCode, expErr)
}

// TestConnectErrWithPGConfig is similar to TestConnectErr, but takes in a
// custom callback to modify connection config parameters before establishing
// the connection.
func (te *tester) TestConnectErrWithPGConfig(
ctx context.Context,
t *testing.T,
url string,
configFn func(*pgx.ConnConfig),
expCode errorCode,
expErr string,
) error {
t.Helper()
te.setAuthenticated(false)
Expand All @@ -2327,6 +2487,9 @@ func (te *tester) TestConnectErr(
cfg.TLSConfig.ServerName = cfg.Host
cfg.Host = "127.0.0.1"
}
if configFn != nil {
configFn(cfg)
}
conn, err := pgx.ConnectConfig(ctx, cfg)
if err == nil {
_ = conn.Close(ctx)
Expand Down
31 changes: 29 additions & 2 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
proxyproto "github.com/pires/go-proxyproto"
)

var (
Expand Down Expand Up @@ -162,10 +163,24 @@ func (s *Server) handleCancel(w http.ResponseWriter, r *http.Request) {
// a health check endpoint at /_status/healthz, and pprof debug
// endpoints at /debug/pprof.
func (s *Server) ServeHTTP(ctx context.Context, ln net.Listener) error {
srv := http.Server{
Handler: s.mux,
if s.handler.RequireProxyProtocol {
ln = &proxyproto.Listener{
Listener: ln,
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
// There is a possibility where components doing healthchecking
// (e.g. Kubernetes) do not support the PROXY protocol directly.
// We use the `USE` policy here (which is also the default) to
// optionally allow the PROXY protocol to be supported. If a
// connection doesn't have the proxy headers, it'll just be
// treated as a regular one.
return proxyproto.USE, nil
},
ValidateHeader: s.handler.testingKnobs.validateProxyHeader,
}
}

srv := http.Server{Handler: s.mux}

go func() {
<-ctx.Done()

Expand Down Expand Up @@ -198,6 +213,18 @@ func (s *Server) ServeHTTP(ctx context.Context, ln net.Listener) error {
// Incoming client connections are taken through the Postgres handshake and
// relayed to the configured backend server.
func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
if s.handler.RequireProxyProtocol {
ln = &proxyproto.Listener{
Listener: ln,
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
// REQUIRE enforces the connection to send a PROXY header.
// The connection will be rejected if one was not present.
return proxyproto.REQUIRE, nil
},
ValidateHeader: s.handler.testingKnobs.validateProxyHeader,
}
}

err := s.Stopper.RunAsyncTask(ctx, "listen-quiesce", func(ctx context.Context) {
<-s.Stopper.ShouldQuiesce()
if err := ln.Close(); err != nil && !grpcutil.IsClosedConnection(err) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/tenantdirsvr/test_simple_directory_svr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (d *TestSimpleDirectoryServer) ListPods(
func (d *TestSimpleDirectoryServer) WatchPods(
req *tenant.WatchPodsRequest, server tenant.Directory_WatchPodsServer,
) error {
// Insted of returning right away, we block until context is done.
// This prevents the proxy server from constantly trying to establish
// a watch in test environments, causing spammy logs.
<-server.Context().Done()
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/cliflags/flags_mt.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ var (
Description: "If true, proxy will not attempt to rebalance connections.",
}

RequireProxyProtocol = FlagInfo{
Name: "require-proxy-protocol",
Description: `Requires PROXY protocol on the SQL listener. The HTTP
listener will support the protocol on a best-effort basis. If this is set to
true, the PROXY info from upstream will be trusted on both SQL and HTTP
listeners, if the headers are allowed.`,
}

RatelimitBaseDelay = FlagInfo{
Name: "ratelimit-base-delay",
Description: "Initial backoff after a failed login attempt. Set to 0 to disable rate limiting.",
Expand Down

0 comments on commit 374ed5e

Please sign in to comment.