Skip to content

Commit

Permalink
ccl/sqlproxyccl: enable a more graceful shutdown
Browse files Browse the repository at this point in the history
When a drainSignal is received, the sql proxy now waits for all connections to
close within a certain time limit (59 minutes) before shutting down.

The next drainSignal will be ignored, but the third will forcefully shut down
the server by panicking. This is to resolve an issue with Kubernetes where
traffic could be lost during upgrades. See CC-5298 for more details.

Release notes: None
  • Loading branch information
PJ Tatlow committed Jun 6, 2022
1 parent fd26d19 commit 22bab19
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
33 changes: 33 additions & 0 deletions pkg/ccl/sqlproxyccl/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/cockroachdb/logtags"
)

var (
awaitNoConnectionsInterval = time.Minute
)

// Server is a TCP server that proxies SQL connections to a configurable
// backend. It may also run an HTTP server to expose a health check and
// prometheus metrics.
Expand Down Expand Up @@ -186,6 +190,35 @@ func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
}
}

// AwaitNoConnections returns a channel that is closed once the server has no open connections.
// This is meant to be used after the server has stopped accepting new connections and we are
// waiting to shutdown the server without inturrupting existing connections
//
// If the context is cancelled the channel will never close because we have to end the async task
// to allow the stopper to completely finish
func (s *Server) AwaitNoConnections(ctx context.Context) <-chan struct{} {
c := make(chan struct{})

_ = s.Stopper.RunAsyncTask(ctx, "await-no-connections", func(context.Context) {
for {
connCount := s.metrics.CurConnCount.Value()
if connCount == 0 {
close(c)
break
}
select {
case <-ctx.Done():
return
case <-time.After(awaitNoConnectionsInterval):
continue
}
}

})

return c
}

// proxyConn is a SQL connection into the proxy.
type proxyConn struct {
net.Conn
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/sqlproxyccl/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -61,3 +63,34 @@ func TestHandleVars(t *testing.T) {

require.Contains(t, string(out), "# HELP proxy_sql_conns")
}

func TestAwaitNoConnections(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

originalInterval := awaitNoConnectionsInterval
awaitNoConnectionsInterval = time.Millisecond
defer func() {
awaitNoConnectionsInterval = originalInterval
}()

proxyServer, err := NewServer(ctx, stopper, ProxyOptions{})
require.NoError(t, err)

//simulate a connection coming in
proxyServer.metrics.CurConnCount.Inc(1)
begin := timeutil.Now()

// wait a few milliseconds and simulate the connection dropping
waitTime := time.Millisecond * 150
_ = stopper.RunAsyncTask(ctx, "decrement-con-count", func(context.Context) {
<-time.After(waitTime)
proxyServer.metrics.CurConnCount.Dec(1)
})
// wait for there to be no connections
<-proxyServer.AwaitNoConnections(ctx)
// make sure we waited for the connection to be dropped
require.GreaterOrEqual(t, timeutil.Since(begin), waitTime)
}
34 changes: 29 additions & 5 deletions pkg/cli/mt_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net"
"os"
"os/signal"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl"
"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
Expand All @@ -27,6 +28,13 @@ import (
"github.com/spf13/cobra"
)

const (
// shutdownConnectionTimeout is the maximum amount of time we will wait
// for all connections to be closed before forcefully closing them by
// shutting down the server
shutdownConnectionTimeout = time.Minute * 59
)

var mtStartSQLProxyCmd = &cobra.Command{
Use: "start-proxy",
Short: "start a sql proxy",
Expand Down Expand Up @@ -85,7 +93,7 @@ func runStartSQLProxy(cmd *cobra.Command, args []string) (returnErr error) {
return err
}

return waitForSignals(ctx, stopper, errChan)
return waitForSignals(ctx, server, stopper, proxyLn, errChan)
}

func initLogging(cmd *cobra.Command) (ctx context.Context, stopper *stop.Stopper, err error) {
Expand All @@ -107,7 +115,11 @@ func initLogging(cmd *cobra.Command) (ctx context.Context, stopper *stop.Stopper
}

func waitForSignals(
ctx context.Context, stopper *stop.Stopper, errChan chan error,
ctx context.Context,
server *sqlproxyccl.Server,
stopper *stop.Stopper,
proxyLn net.Listener,
errChan chan error,
) (returnErr error) {
// Need to alias the signals if this has to run on non-unix OSes too.
signalCh := make(chan os.Signal, 1)
Expand All @@ -130,6 +142,17 @@ func waitForSignals(
returnErr = errors.New("interrupted")
}
go func() {
// Begin shutdown by:
// 1. Stopping the TCP listener so no new connections can be established
// 2. Waiting for all connections to close "naturally" or
// waiting for "shutdownConnectionTimeout" to elapse after which
// open TCP connections will be forcefully closed so the server can stop
log.Infof(ctx, "stopping tcp listener")
_ = proxyLn.Close()
select {
case <-server.AwaitNoConnections(ctx):
case <-time.After(shutdownConnectionTimeout):
}
log.Infof(ctx, "server stopping")
stopper.Stop(ctx)
}()
Expand All @@ -138,12 +161,13 @@ func waitForSignals(
select {} // Block and wait for logging go routine to shut down the process
}

numInturrupts := 0
for {
select {
case sig := <-signalCh:
switch sig {
case os.Interrupt: // SIGTERM after SIGTERM
log.Ops.Infof(ctx, "received additional signal '%s'; continuing graceful shutdown", sig)
if numInturrupts == 0 {
numInturrupts++
log.Ops.Infof(ctx, "received additional signal '%s'; continuing graceful shutdown. Next signal will force shutdown.", sig)
continue
}

Expand Down

0 comments on commit 22bab19

Please sign in to comment.