Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle TERM signals #206

Merged
merged 9 commits into from
Oct 9, 2018
Merged
24 changes: 22 additions & 2 deletions cmd/cloud_sql_proxy/cloud_sql_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
Expand Down Expand Up @@ -76,6 +78,7 @@ can be removed automatically by this program.`)
// Settings for limits
maxConnections = flag.Uint64("max_connections", 0, `If provided, the maximum number of connections to establish before refusing new connections. Defaults to 0 (no limit)`)
fdRlimit = flag.Uint64("fd_rlimit", limits.ExpectedFDs, `Sets the rlimit on the number of open file descriptors for the proxy to the provided value. If set to zero, disables attempts to set the rlimit. Defaults to a value which can support 4K connections to one instance`)
termTimeout = flag.Duration("term_timeout", 0, "When set, the proxy will wait for existing connections to close before terminating. Any connections that haven't closed after the timeout will be dropped")

// Settings for authentication.
token = flag.String("token", "", "When set, the proxy uses this Bearer token for authorization.")
Expand Down Expand Up @@ -482,7 +485,7 @@ func main() {
}
logging.Infof("Ready for new connections")

(&proxy.Client{
proxyClient := &proxy.Client{
Port: port,
MaxConnections: *maxConnections,
Certs: certs.NewCertSourceOpts(client, certs.RemoteOpts{
Expand All @@ -493,5 +496,22 @@ func main() {
}),
Conns: connset,
RefreshCfgThrottle: refreshCfgThrottle,
}).Run(connSrc)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need SIGTERM too, or is SIGINT just enough? I believe the more platform-independent way is to not use the syscall package but instead lean on the os package's os.Interrupt (https://golang.org/pkg/os/#Signal) which is equal to syscall.SIGINT.

I checked the windows versions and syscall.SIGINT and syscall.SIGTERM both exist, but who knows what SIGTERM means there and in other places. os.Interrupt == syscall.SIGINT for all platforms except plan9 anyway (https://golang.org/src/os/exec_posix.go?h=Interrupt#L18) so if SIGINT is enough I say we stick with it by using os.Interrupt here; if you know you (or others) will want SIGTERM for sure then I think it's OK to commit and fix things later if it causes a real problem for anyone.


go func() {
<-signals
logging.Infof("Received TERM signal. Waiting up to %s before terminating.", *termTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the information that we are no longer accepting new connections? That way people can correlate logs in their application about not being able to connect with this log indicating that they should expect new connections to fail.


err := proxyClient.Shutdown(*termTimeout)

if err == nil {
os.Exit(0)
}
os.Exit(2)
}()

proxyClient.Run(connSrc)
}
33 changes: 23 additions & 10 deletions proxy/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,15 @@ func (c *Client) Run(connSrc <-chan Conn) {
}

func (c *Client) handleConn(conn Conn) {
// Track connections count only if a maximum connections limit is set to avoid useless overhead
if c.MaxConnections > 0 {
active := atomic.AddUint64(&c.ConnectionsCounter, 1)
active := atomic.AddUint64(&c.ConnectionsCounter, 1)

// Deferred decrement of ConnectionsCounter upon connection closing
defer atomic.AddUint64(&c.ConnectionsCounter, ^uint64(0))
// Deferred decrement of ConnectionsCounter upon connection closing
defer atomic.AddUint64(&c.ConnectionsCounter, ^uint64(0))

if active > c.MaxConnections {
logging.Errorf("too many open connections (max %d)", c.MaxConnections)
conn.Conn.Close()
return
}
if active > c.MaxConnections && c.MaxConnections > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are storing to MaxConnections using the atomic package, we need to use the atomic package to read the variable or else it's not guaranteed to be data-race safe. Also, a little nit-picky, but I suggest reversing the order of the checks so that the MaxConnections is checked for nonzero before checking the other comparison. Something like:

if max := atomic.LoadUint64(&c.MaxConnections); max > 0 && active > max {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: order of operation for these booleans:

if c.MaxConnections > 0 && active > c.MaxConnections {

logging.Errorf("too many open connections (max %d)", c.MaxConnections)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a different message in the case that we blocked the connection because we are shutting down? Otherwise these logs during shutdown might be a little confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved because the "block connections" feature was dropped.

conn.Conn.Close()
return
}

server, err := c.Dial(conn.Instance)
Expand Down Expand Up @@ -323,3 +320,19 @@ func NewConnSrc(instance string, l net.Listener) <-chan Conn {
}()
return ch
}

// Shutdown waits up to a given amount of time for all active connections to
// close. Returns an error if there are still active connections after waiting
// for the whole length of the timeout.
func (c *Client) Shutdown(termTimeout time.Duration) error {
termTime := time.Now().Add(termTimeout)
for termTime.After(time.Now()) && atomic.LoadUint64(&c.ConnectionsCounter) > 0 {
time.Sleep(1)
}

active := atomic.LoadUint64(&c.ConnectionsCounter)
if active == 0 {
return nil
}
return fmt.Errorf("%d active connections still exist after waiting for %v", active, termTimeout)
}
32 changes: 32 additions & 0 deletions proxy/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,35 @@ func TestMaximumConnectionsCount(t *testing.T) {
t.Errorf("client should have dialed exactly the maximum of %d connections (%d connections, %d dials)", maxConnections, numConnections, dials)
}
}

func TestShutdownTerminatesEarly(t *testing.T) {
b := &fakeCerts{}
c := &Client{
Certs: &blockingCertSource{
map[string]*fakeCerts{
instance: b,
}},
Dialer: func(string, string) (net.Conn, error) {
return nil, nil
},
}

shutdown := make(chan bool, 1)
go func() {
c.Shutdown(1)
shutdown <- true
}()

shutdownFinished := false

// In case the code is actually broken and the client doesn't shut down quickly, don't cause the test to hang until it times out.
select {
case <-time.After(100 * time.Millisecond):
case shutdownFinished = <-shutdown:
}

if !shutdownFinished {
t.Errorf("shutdown should have completed quickly because there are no active connections")
}

}