Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for min ready instances #1496

Merged
merged 16 commits into from
Nov 8, 2022
25 changes: 24 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,27 @@ Instance Level Configuration
my-project:us-central1:my-db-server \
'my-project:us-central1:my-other-server?address=0.0.0.0&port=7000'

Health checks

When enabling the --health-checks flag, the proxy will start an HTTP server
on localhost with three endpoints:

- /startup: Returns 200 status when the proxy has finished starting up.
Otherwise returns 503 status.

- /readiness: Returns 200 status when the proxy has started, has available
connections if max connections have been set with the --max-connections
flag, and when the proxy can connect to all registered instances. Otherwise,
returns a 503 status. Optionally supports a min-ready query param (e.g.,
/readiness?min-ready=3) where the proxy will return a 200 status if the
proxy can connect successfully to at least min-ready number of instances. If
min-ready exceeds the number of registered instances, returns a 400.

- /liveness: Always returns 200 status. If this endpoint is not responding,
the proxy is in a bad state and should be restarted.

To configure the address, use --http-server.

Service Account Impersonation

The proxy supports service account impersonation with the
Expand Down Expand Up @@ -626,6 +647,8 @@ func runSignalWrapper(cmd *Command) error {
notify := func() {}
if cmd.healthCheck {
needsHTTPServer = true
cmd.logger.Infof("Starting health check server at %s",
net.JoinHostPort(cmd.httpAddress, cmd.httpPort))
hc := healthcheck.NewCheck(p, cmd.logger)
mux.HandleFunc("/startup", hc.HandleStartup)
mux.HandleFunc("/readiness", hc.HandleReadiness)
Expand All @@ -636,7 +659,7 @@ func runSignalWrapper(cmd *Command) error {
// Start the HTTP server if anything requiring HTTP is specified.
if needsHTTPServer {
server := &http.Server{
Addr: fmt.Sprintf("%s:%s", cmd.httpAddress, cmd.httpPort),
Addr: net.JoinHostPort(cmd.httpAddress, cmd.httpPort),
Handler: mux,
}
// Start the HTTP server.
Expand Down
59 changes: 53 additions & 6 deletions internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"sync"

"github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/cloudsql"
Expand Down Expand Up @@ -67,7 +68,7 @@ var errNotStarted = errors.New("proxy is not started")
// HandleReadiness ensures the Check has been notified of successful startup,
// that the proxy has not reached maximum connections, and that all connections
// are healthy.
func (c *Check) HandleReadiness(w http.ResponseWriter, _ *http.Request) {
func (c *Check) HandleReadiness(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -88,18 +89,64 @@ func (c *Check) HandleReadiness(w http.ResponseWriter, _ *http.Request) {
return
}

err := c.proxy.CheckConnections(ctx)
if err != nil {
c.logger.Errorf("[Health Check] Readiness failed: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(err.Error()))
var minReady *int
q := req.URL.Query()
if v := q.Get("min-ready"); v != "" {
n, err := strconv.Atoi(v)
if err != nil {
c.logger.Errorf("[Health Check] min-ready must be a valid integer, got = %q", v)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "min-query must be a valid integer, got = %q", v)
return
}
if n <= 0 {
c.logger.Errorf("[Health Check] min-ready %q must be greater than zero", v)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "min-query must be greater than zero", v)
return
}
minReady = &n
}

n, err := c.proxy.CheckConnections(ctx)
if status, rErr := ready(err, minReady, n); rErr != nil {
c.logger.Errorf("[Health Check] Readiness failed: %v", rErr)
w.WriteHeader(status)
w.Write([]byte(rErr.Error()))
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}

func ready(err error, minReady *int, total int) (int, error) {
// If err is nil, then the proxy is ready.
enocom marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
if minReady != nil && *minReady > total {
return http.StatusBadRequest, fmt.Errorf(
"min-ready (%v) must be less than or equal to the number of registered instances (%v)",
*minReady, total,
)
}
return http.StatusOK, nil
}
// When minReady is not configured, any error means the proxy is not ready.
if minReady == nil {
return http.StatusServiceUnavailable, err
}
mErr, ok := err.(proxy.MultiErr)
if !ok {
return http.StatusServiceUnavailable, err
}
notReady := len(mErr)
areReady := total - notReady
if areReady < *minReady {
return http.StatusServiceUnavailable, err
}
return http.StatusOK, nil
}

// HandleLiveness indicates the process is up and responding to HTTP requests.
// If this check fails (because it's not reachable), the process is in a bad
// state and should be restarted.
Expand Down
128 changes: 114 additions & 14 deletions internal/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -71,6 +73,21 @@ func (*fakeDialer) Close() error {
return nil
}

type flakeyDialer struct {
dialCount uint64
fakeDialer
}

// Dial fails on odd calls and succeeds on even calls.
func (f *flakeyDialer) Dial(_ context.Context, _ string, _ ...cloudsqlconn.DialOption) (net.Conn, error) {
c := atomic.AddUint64(&f.dialCount, 1)
if c%2 == 0 {
conn, _ := net.Pipe()
return conn, nil
}
return nil, errors.New("flakey dialer fails on odd calls")
}

type errorDialer struct {
fakeDialer
}
Expand All @@ -79,13 +96,11 @@ func (*errorDialer) Dial(_ context.Context, _ string, _ ...cloudsqlconn.DialOpti
return nil, errors.New("errorDialer always errors")
}

func newProxyWithParams(t *testing.T, maxConns uint64, dialer cloudsql.Dialer) *proxy.Client {
func newProxyWithParams(t *testing.T, maxConns uint64, dialer cloudsql.Dialer, instances []proxy.InstanceConnConfig) *proxy.Client {
c := &proxy.Config{
Addr: proxyHost,
Port: proxyPort,
Instances: []proxy.InstanceConnConfig{
{Name: "proj:region:pg"},
},
Addr: proxyHost,
Port: proxyPort,
Instances: instances,
MaxConnections: maxConns,
}
p, err := proxy.NewClient(context.Background(), dialer, logger, c)
Expand All @@ -96,15 +111,17 @@ func newProxyWithParams(t *testing.T, maxConns uint64, dialer cloudsql.Dialer) *
}

func newTestProxyWithMaxConns(t *testing.T, maxConns uint64) *proxy.Client {
return newProxyWithParams(t, maxConns, &fakeDialer{})
return newProxyWithParams(t, maxConns, &fakeDialer{}, []proxy.InstanceConnConfig{
{Name: "proj:region:pg"},
})
}

func newTestProxyWithDialer(t *testing.T, d cloudsql.Dialer) *proxy.Client {
return newProxyWithParams(t, 0, d)
return newProxyWithParams(t, 0, d, []proxy.InstanceConnConfig{{Name: "proj:region:pg"}})
}

func newTestProxy(t *testing.T) *proxy.Client {
return newProxyWithParams(t, 0, &fakeDialer{})
return newProxyWithParams(t, 0, &fakeDialer{}, []proxy.InstanceConnConfig{{Name: "proj:region:pg"}})
}

func TestHandleStartupWhenNotNotified(t *testing.T) {
Expand All @@ -117,7 +134,7 @@ func TestHandleStartupWhenNotNotified(t *testing.T) {
check := healthcheck.NewCheck(p, logger)

rec := httptest.NewRecorder()
check.HandleStartup(rec, &http.Request{})
check.HandleStartup(rec, &http.Request{URL: &url.URL{}})

// Startup is not complete because the Check has not been notified of the
// proxy's startup.
Expand All @@ -139,7 +156,7 @@ func TestHandleStartupWhenNotified(t *testing.T) {
check.NotifyStarted()

rec := httptest.NewRecorder()
check.HandleStartup(rec, &http.Request{})
check.HandleStartup(rec, &http.Request{URL: &url.URL{}})

resp := rec.Result()
if got, want := resp.StatusCode, http.StatusOK; got != want {
Expand All @@ -157,7 +174,7 @@ func TestHandleReadinessWhenNotNotified(t *testing.T) {
check := healthcheck.NewCheck(p, logger)

rec := httptest.NewRecorder()
check.HandleReadiness(rec, &http.Request{})
check.HandleReadiness(rec, &http.Request{URL: &url.URL{}})

resp := rec.Result()
if got, want := resp.StatusCode, http.StatusServiceUnavailable; got != want {
Expand Down Expand Up @@ -193,13 +210,14 @@ func TestHandleReadinessForMaxConns(t *testing.T) {
waitForConnect := func(t *testing.T, wantCode int) *http.Response {
for i := 0; i < 10; i++ {
rec := httptest.NewRecorder()
check.HandleReadiness(rec, &http.Request{})
check.HandleReadiness(rec, &http.Request{URL: &url.URL{}})
resp := rec.Result()
if resp.StatusCode == wantCode {
return resp
}
time.Sleep(time.Second)
}
t.Fatalf("failed to receive status code = %v", wantCode)
return nil
}
resp := waitForConnect(t, http.StatusServiceUnavailable)
Expand All @@ -224,7 +242,7 @@ func TestHandleReadinessWithConnectionProblems(t *testing.T) {
check.NotifyStarted()

rec := httptest.NewRecorder()
check.HandleReadiness(rec, &http.Request{})
check.HandleReadiness(rec, &http.Request{URL: &url.URL{}})

resp := rec.Result()
if got, want := resp.StatusCode, http.StatusServiceUnavailable; got != want {
Expand All @@ -239,3 +257,85 @@ func TestHandleReadinessWithConnectionProblems(t *testing.T) {
t.Fatalf("want substring with = %q, got = %v", want, string(body))
}
}

func TestReadinessWithMinReady(t *testing.T) {
tcs := []struct {
desc string
minReady string
wantStatus int
dialer cloudsql.Dialer
}{
{
desc: "when min ready is zero",
minReady: "0",
wantStatus: http.StatusBadRequest,
dialer: &fakeDialer{},
},
{
desc: "when min ready is less than zero",
minReady: "-1",
wantStatus: http.StatusBadRequest,
dialer: &fakeDialer{},
},
{
desc: "when only one instance must be ready",
minReady: "1",
wantStatus: http.StatusOK,
dialer: &flakeyDialer{}, // fails on first call, succeeds on second
},
{
desc: "when all instances must be ready",
minReady: "2",
wantStatus: http.StatusServiceUnavailable,
dialer: &errorDialer{},
},
{
desc: "when min ready is greater than the number of instances",
minReady: "3",
wantStatus: http.StatusBadRequest,
dialer: &fakeDialer{},
},
{
desc: "when min ready is bogus",
enocom marked this conversation as resolved.
Show resolved Hide resolved
minReady: "bogus",
wantStatus: http.StatusBadRequest,
dialer: &fakeDialer{},
},
{
desc: "when min ready is not set",
minReady: "",
wantStatus: http.StatusOK,
dialer: &fakeDialer{},
},
}
enocom marked this conversation as resolved.
Show resolved Hide resolved
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
p := newProxyWithParams(t, 0,
tc.dialer,
[]proxy.InstanceConnConfig{
{Name: "p:r:instance-1"},
{Name: "p:r:instance-2"},
},
)
defer func() {
if err := p.Close(); err != nil {
t.Logf("failed to close proxy client: %v", err)
}
}()

check := healthcheck.NewCheck(p, logger)
check.NotifyStarted()
u, err := url.Parse(fmt.Sprintf("/readiness?min-ready=%s", tc.minReady))
if err != nil {
t.Fatal(err)
}
rec := httptest.NewRecorder()
check.HandleReadiness(rec, &http.Request{URL: u})

resp := rec.Result()
if got, want := resp.StatusCode, tc.wantStatus; got != want {
t.Fatalf("want = %v, got = %v", want, got)
}
})
}
}
6 changes: 5 additions & 1 deletion internal/proxy/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,13 @@ func TestFUSECheckConnections(t *testing.T) {
conn := tryDialUnix(t, filepath.Join(fuseDir, "proj:reg:mysql"))
defer conn.Close()

if err := c.CheckConnections(context.Background()); err != nil {
n, err := c.CheckConnections(context.Background())
if err != nil {
t.Fatalf("c.CheckConnections(): %v", err)
}
if want, got := 1, n; want != got {
t.Fatalf("CheckConnections number of connections: want = %v, got = %v", want, got)
}

// verify the dialer was invoked twice, once for connect, once for check
// connection
Expand Down
Loading