From 22bab193bcf06d20838d9dd5090019ed420bc62c Mon Sep 17 00:00:00 2001 From: PJ Tatlow <pj@cockroachlabs.com> Date: Wed, 1 Jun 2022 16:30:10 -0600 Subject: [PATCH] ccl/sqlproxyccl: enable a more graceful shutdown When a drainSignal is received, the sql proxy now waits for all connections to close within a certain time limit (59 minutes) before shutting down. The next drainSignal will be ignored, but the third will forcefully shut down the server by panicking. This is to resolve an issue with Kubernetes where traffic could be lost during upgrades. See CC-5298 for more details. Release notes: None --- pkg/ccl/sqlproxyccl/server.go | 33 +++++++++++++++++++++++++++++ pkg/ccl/sqlproxyccl/server_test.go | 33 +++++++++++++++++++++++++++++ pkg/cli/mt_proxy.go | 34 +++++++++++++++++++++++++----- 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/server.go b/pkg/ccl/sqlproxyccl/server.go index 3abe481d36ea..3ff9350f6a8e 100644 --- a/pkg/ccl/sqlproxyccl/server.go +++ b/pkg/ccl/sqlproxyccl/server.go @@ -26,6 +26,10 @@ import ( "github.com/cockroachdb/logtags" ) +var ( + awaitNoConnectionsInterval = time.Minute +) + // Server is a TCP server that proxies SQL connections to a configurable // backend. It may also run an HTTP server to expose a health check and // prometheus metrics. @@ -186,6 +190,35 @@ func (s *Server) Serve(ctx context.Context, ln net.Listener) error { } } +// AwaitNoConnections returns a channel that is closed once the server has no open connections. +// This is meant to be used after the server has stopped accepting new connections and we are +// waiting to shutdown the server without inturrupting existing connections +// +// If the context is cancelled the channel will never close because we have to end the async task +// to allow the stopper to completely finish +func (s *Server) AwaitNoConnections(ctx context.Context) <-chan struct{} { + c := make(chan struct{}) + + _ = s.Stopper.RunAsyncTask(ctx, "await-no-connections", func(context.Context) { + for { + connCount := s.metrics.CurConnCount.Value() + if connCount == 0 { + close(c) + break + } + select { + case <-ctx.Done(): + return + case <-time.After(awaitNoConnectionsInterval): + continue + } + } + + }) + + return c +} + // proxyConn is a SQL connection into the proxy. type proxyConn struct { net.Conn diff --git a/pkg/ccl/sqlproxyccl/server_test.go b/pkg/ccl/sqlproxyccl/server_test.go index c20491b07780..b6fc636ef5d0 100644 --- a/pkg/ccl/sqlproxyccl/server_test.go +++ b/pkg/ccl/sqlproxyccl/server_test.go @@ -14,9 +14,11 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -61,3 +63,34 @@ func TestHandleVars(t *testing.T) { require.Contains(t, string(out), "# HELP proxy_sql_conns") } + +func TestAwaitNoConnections(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + originalInterval := awaitNoConnectionsInterval + awaitNoConnectionsInterval = time.Millisecond + defer func() { + awaitNoConnectionsInterval = originalInterval + }() + + proxyServer, err := NewServer(ctx, stopper, ProxyOptions{}) + require.NoError(t, err) + + //simulate a connection coming in + proxyServer.metrics.CurConnCount.Inc(1) + begin := timeutil.Now() + + // wait a few milliseconds and simulate the connection dropping + waitTime := time.Millisecond * 150 + _ = stopper.RunAsyncTask(ctx, "decrement-con-count", func(context.Context) { + <-time.After(waitTime) + proxyServer.metrics.CurConnCount.Dec(1) + }) + // wait for there to be no connections + <-proxyServer.AwaitNoConnections(ctx) + // make sure we waited for the connection to be dropped + require.GreaterOrEqual(t, timeutil.Since(begin), waitTime) +} diff --git a/pkg/cli/mt_proxy.go b/pkg/cli/mt_proxy.go index dff7bbf49aee..231e7da54697 100644 --- a/pkg/cli/mt_proxy.go +++ b/pkg/cli/mt_proxy.go @@ -16,6 +16,7 @@ import ( "net" "os" "os/signal" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl" "github.com/cockroachdb/cockroach/pkg/cli/clierrorplus" @@ -27,6 +28,13 @@ import ( "github.com/spf13/cobra" ) +const ( + // shutdownConnectionTimeout is the maximum amount of time we will wait + // for all connections to be closed before forcefully closing them by + // shutting down the server + shutdownConnectionTimeout = time.Minute * 59 +) + var mtStartSQLProxyCmd = &cobra.Command{ Use: "start-proxy", Short: "start a sql proxy", @@ -85,7 +93,7 @@ func runStartSQLProxy(cmd *cobra.Command, args []string) (returnErr error) { return err } - return waitForSignals(ctx, stopper, errChan) + return waitForSignals(ctx, server, stopper, proxyLn, errChan) } func initLogging(cmd *cobra.Command) (ctx context.Context, stopper *stop.Stopper, err error) { @@ -107,7 +115,11 @@ func initLogging(cmd *cobra.Command) (ctx context.Context, stopper *stop.Stopper } func waitForSignals( - ctx context.Context, stopper *stop.Stopper, errChan chan error, + ctx context.Context, + server *sqlproxyccl.Server, + stopper *stop.Stopper, + proxyLn net.Listener, + errChan chan error, ) (returnErr error) { // Need to alias the signals if this has to run on non-unix OSes too. signalCh := make(chan os.Signal, 1) @@ -130,6 +142,17 @@ func waitForSignals( returnErr = errors.New("interrupted") } go func() { + // Begin shutdown by: + // 1. Stopping the TCP listener so no new connections can be established + // 2. Waiting for all connections to close "naturally" or + // waiting for "shutdownConnectionTimeout" to elapse after which + // open TCP connections will be forcefully closed so the server can stop + log.Infof(ctx, "stopping tcp listener") + _ = proxyLn.Close() + select { + case <-server.AwaitNoConnections(ctx): + case <-time.After(shutdownConnectionTimeout): + } log.Infof(ctx, "server stopping") stopper.Stop(ctx) }() @@ -138,12 +161,13 @@ func waitForSignals( select {} // Block and wait for logging go routine to shut down the process } + numInturrupts := 0 for { select { case sig := <-signalCh: - switch sig { - case os.Interrupt: // SIGTERM after SIGTERM - log.Ops.Infof(ctx, "received additional signal '%s'; continuing graceful shutdown", sig) + if numInturrupts == 0 { + numInturrupts++ + log.Ops.Infof(ctx, "received additional signal '%s'; continuing graceful shutdown. Next signal will force shutdown.", sig) continue }