From 18eac87ca67b90224df64a1812e9d000bd4911e7 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 10 Mar 2023 13:35:13 -0500 Subject: [PATCH 1/3] cli: compute --drain-wait based on cluster setting values Release note (cli change): The --drain-wait argument for the `drain` command will be automatically increased if the command detects that it is smaller than the sum of server.shutdown.drain_wait, server.shutdown.connection_wait, server.shutdown.query_wait times two, and server.shutdown.lease_transfer_wait. If the --drain-wait argument is 0, then no timeout is used. This recommendation was already documented, but now the advice will be applied automatically. --- pkg/cli/rpc_node_shutdown.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/cli/rpc_node_shutdown.go b/pkg/cli/rpc_node_shutdown.go index c008b767b058..4cb765bc7132 100644 --- a/pkg/cli/rpc_node_shutdown.go +++ b/pkg/cli/rpc_node_shutdown.go @@ -64,6 +64,39 @@ func doDrain( return doDrainNoTimeout(ctx, c, targetNode) } + shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{ + Keys: []string{ + "server.shutdown.drain_wait", + "server.shutdown.connection_wait", + "server.shutdown.query_wait", + "server.shutdown.lease_transfer_wait", + }, + UnredactedValues: true, + }) + if err != nil { + return false, true, err + } + + // Add an extra buffer of 10 seconds for the timeout. + minWait := 10 * time.Second + for k, v := range shutdownSettings.KeyValues { + wait, err := time.ParseDuration(v.Value) + if err != nil { + return false, true, err + } + minWait += wait + // query_wait is used twice during draining, so count it twice here. + if k == "server.shutdown.query_wait" { + minWait += wait + } + } + if minWait > drainCtx.drainWait { + log.Infof(ctx, "--drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ + "cluster settings require a value of at least %s; using the larger value", + drainCtx.drainWait, minWait) + drainCtx.drainWait = minWait + } + err = contextutil.RunWithTimeout(ctx, "drain", drainCtx.drainWait, func(ctx context.Context) (err error) { hardError, remainingWork, err = doDrainNoTimeout(ctx, c, targetNode) return err From 70418f63bf42f9f634779e079c92c409baaaf864 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Sat, 11 Mar 2023 18:02:34 -0500 Subject: [PATCH 2/3] sql: fix check for closing connExecutor during draining This fixes a minor bug in which the connection would not get closed at the earliest possible time during server shutdown. The connection is supposed to be closed as soon as we handle a Sync message when the conn_executor is in the draining state and not in a transaction. Since the transaction state was checked before state transitions occurred, this would cause the connection to remain open for an extra bit of time. This was particularly a problem because the Sync message is also the command that auto-commits an implicit transaction. So before this commit, it was actually impossible for the check to work as it was supposed to. Now we check the txn state after state transitions occur. Release note: None --- pkg/sql/conn_executor.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9dfd019faa65..9facd8a25f35 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1911,6 +1911,7 @@ func (ex *connExecutor) run( var err error if err = ex.execCmd(); err != nil { + // Both of these errors are normal ways for the connExecutor to exit. if errors.IsAny(err, io.EOF, errDrainingComplete) { return nil } @@ -1934,7 +1935,7 @@ var errDrainingComplete = fmt.Errorf("draining done. this is a good time to fini // Returns drainingComplete if the session should finish because draining is // complete (i.e. we received a DrainRequest - possibly previously - and the // connection is found to be idle). -func (ex *connExecutor) execCmd() error { +func (ex *connExecutor) execCmd() (retErr error) { ctx := ex.Ctx() cmd, pos, err := ex.stmtBuf.CurCmd() if err != nil { @@ -2121,16 +2122,16 @@ func (ex *connExecutor) execCmd() error { // Note that the Sync result will flush results to the network connection. res = ex.clientComm.CreateSyncResult(pos) if ex.draining { - // If we're draining, check whether this is a good time to finish the + // If we're draining, then after handing the Sync connExecutor state + // transition, check whether this is a good time to finish the // connection. If we're not inside a transaction, we stop processing // now. If we are inside a transaction, we'll check again the next time // a Sync is processed. - if ex.idleConn() { - // If we're about to close the connection, close res in order to flush - // now, as we won't have an opportunity to do it later. - res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState())) - return errDrainingComplete - } + defer func() { + if ex.idleConn() { + retErr = errDrainingComplete + } + }() } case CopyIn: ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryReceived, tcmd.TimeReceived) From d6cdd94ab323315357af6134ed594fe278bbeb78 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 10 Mar 2023 15:18:20 -0500 Subject: [PATCH 3/3] roachtest: enhance drain test The test now does much more: - Checks that --drain-wait is automatically increased if it is set lower than the cluster settings require. - Check that the /health?ready=1 endpoint fails during the drain_wait period. - Check for the proper error message during the connection_wait phase. - Check for the proper error message when trying to begin a new query/transaction during the query_wait phase. - Check that an open transaction is allowed to continue during the query_wait phase. - Check for the proper error message when a query is canceled during shutdown. Release note: None --- pkg/cli/rpc_node_shutdown.go | 4 +- pkg/cmd/roachtest/tests/drain.go | 179 ++++++++++++++++++++++++------- pkg/sql/pgwire/conn.go | 10 +- pkg/sql/sqlerrors/errors.go | 2 +- 4 files changed, 150 insertions(+), 45 deletions(-) diff --git a/pkg/cli/rpc_node_shutdown.go b/pkg/cli/rpc_node_shutdown.go index 4cb765bc7132..5abb915f4453 100644 --- a/pkg/cli/rpc_node_shutdown.go +++ b/pkg/cli/rpc_node_shutdown.go @@ -91,8 +91,8 @@ func doDrain( } } if minWait > drainCtx.drainWait { - log.Infof(ctx, "--drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ - "cluster settings require a value of at least %s; using the larger value", + fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ + "cluster settings require a value of at least %s; using the larger value\n", drainCtx.drainWait, minWait) drainCtx.drainWait = minWait } diff --git a/pkg/cmd/roachtest/tests/drain.go b/pkg/cmd/roachtest/tests/drain.go index 265b8bad0ddc..e43740e38154 100644 --- a/pkg/cmd/roachtest/tests/drain.go +++ b/pkg/cmd/roachtest/tests/drain.go @@ -14,8 +14,11 @@ import ( "context" gosql "database/sql" "fmt" + "io" "math/rand" + "net/http" "path/filepath" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -23,8 +26,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -56,20 +62,28 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl var err error const ( // Set the duration of each phase of the draining period. - drainWaitDuration = 5 * time.Second + drainWaitDuration = 10 * time.Second connectionWaitDuration = 100 * time.Second queryWaitDuration = 10 * time.Second - // pokeDuringConnWaitTimestamp is the timestamp after the server - // starts waiting for SQL connections to close (with the start of the whole - // draining process marked as timestamp 0). It should be set larger than - // drainWaitDuration, but smaller than (drainWaitDuration + - // connectionWaitDuration). - pokeDuringConnWaitTimestamp = 15 * time.Second - connMaxLifetime = 10 * time.Second - connMaxCount = 5 - nodeToDrain = 1 + + // server.shutdown.lease_transfer_wait defaults to 5 seconds. + leaseTransferWaitDuration = 5 * time.Second + + // pokeDuringDrainWaitDelay is the amount of time after drain begins when we + // will check that the server is reporting itself as unhealthy to load + // balancers. It should be set smaller than drainWaitDuration. + pokeDuringDrainWaitDelay = 5 * time.Second + + // pokeDuringConnWaitDelay is the amount of time after drain begins when we + // will check that the server is waiting for SQL connections to close. It + // should be set larger than drainWaitDuration, but smaller than + // (drainWaitDuration + connectionWaitDuration). + pokeDuringConnWaitDelay = 20 * time.Second + + connMaxLifetime = 10 * time.Second + connMaxCount = 5 + nodeToDrain = 1 ) - totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration) @@ -84,9 +98,7 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl // Get two connections from the connection pools. for j := 0; j < 2; j++ { conn, err := db.Conn(ctx) - require.NoError(t, err, "failed to a SQL connection from the connection pool") - conns = append(conns, conn) } @@ -95,32 +107,71 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("start draining node %d", nodeToDrain)) - return c.RunE(ctx, + results, err := c.RunWithDetailsSingleNode( + ctx, + t.L(), c.Node(nodeToDrain), - fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs", - totalWaitDuration.Seconds())) + // --drain-wait is set to a low value so that we can confirm that it + // gets automatically upgraded to use a higher value larger than the sum + // of server.shutdown.drain_wait, server.shutdown.connection_wait, + // server.shutdown.query_wait times two, and + // server.shutdown.lease_transfer_wait. + "./cockroach node drain --self --insecure --drain-wait=10s", + ) + if err != nil { + return err + } + + expectedDrain := drainWaitDuration + connectionWaitDuration + queryWaitDuration*2 + leaseTransferWaitDuration + 10*time.Second + if !strings.Contains( + results.Stderr, + fmt.Sprintf( + "cluster settings require a value of at least %s; using the larger value", + expectedDrain), + ) { + return errors.Newf("expected --drain-wait to be upgraded to %s", expectedDrain) + } + + return nil }) drainStartTimestamp := timeutil.Now() + // Sleep till the server is in the status of reporting itself as unhealthy. + // Verify that the server is still allowing new SQL connections now. + time.Sleep(pokeDuringDrainWaitDelay) + t.Status(fmt.Sprintf("%s after draining starts, health check returns false", pokeDuringDrainWaitDelay)) + conn, err := db.Conn(ctx) + require.NoError(t, err, "new SQL connection should be allowed during drain_wait") + err = conn.Close() + require.NoError(t, err) + addr, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(nodeToDrain)) + require.NoError(t, err) + url := `http://` + addr[0] + `/health?ready=1` + resp, err := httputil.Get(ctx, url) + require.NoError(t, err) + defer func() { _ = resp.Body.Close() }() + require.Equalf(t, http.StatusServiceUnavailable, resp.StatusCode, "expected healthcheck to fail during drain") + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(bodyBytes), "node is shutting down") + // Sleep till the server is in the status of waiting for users to close SQL // connections. Verify that the server is rejecting new SQL connections now. - time.Sleep(pokeDuringConnWaitTimestamp) - + time.Sleep(drainStartTimestamp.Add(pokeDuringConnWaitDelay).Sub(timeutil.Now())) + t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new SQL connections", pokeDuringConnWaitDelay)) if _, err := db.Conn(ctx); err != nil { - t.Status(fmt.Sprintf("%s after draining starts, server is rejecting "+ - "new SQL connections: %v", pokeDuringConnWaitTimestamp, err)) + require.ErrorContains(t, err, "server is not accepting clients, try another node") } else { t.Fatal(errors.New("new SQL connections should not be allowed when the server " + "starts waiting for the user to close SQL connections")) } - require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2") - t.Status("number of open connections: ", db.Stats().OpenConnections) + require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2") randConn := conns[rand.Intn(len(conns))] - t.Status("execting sql query with connection %s", randConn) + t.Status("execing sql query with connection") // When server is waiting clients to close connections, verify that SQL // queries do not fail. @@ -134,17 +185,18 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl "expected connection to be able to be successfully closed client-side") } - t.Status("all SQL connections are put back to the connection pool") + t.Status("all SQL connections are closed") err = m.WaitE() require.NoError(t, err, "error waiting for the draining to finish") drainEndTimestamp := timeutil.Now() - actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp).Seconds() + actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp) t.L().Printf("the draining lasted %f seconds", actualDrainDuration) - if actualDrainDuration >= float64(totalWaitDuration)-10 { + totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration + if actualDrainDuration >= totalWaitDuration-10*time.Second { t.Fatal(errors.New("the draining process didn't early exit " + "when waiting for server to close all SQL connections")) } @@ -160,30 +212,81 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust drainWaitDuration = 0 * time.Second connectionWaitDuration = 10 * time.Second queryWaitDuration = 20 * time.Second - nodeToDrain = 1 + // pokeDuringQueryWaitDelay is the amount of time after drain begins when we + // will check that the server does not allow any new query to begin on a + // connection that is still open. It + // should be set larger than (drainWaitDuration + connectionWaitDuration), + // but smaller than + // (drainWaitDuration + connectionWaitDuration + queryWaitDuration). + pokeDuringQueryWaitDelay = 15 * time.Second + nodeToDrain = 1 ) - totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration - prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration) - db := c.Conn(ctx, t.L(), nodeToDrain) - defer db.Close() - - // Get a connection from the connection pool. - _, err = db.Conn(ctx) - - require.NoError(t, err, "cannot get a SQL connection from the connection pool") + pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(nodeToDrain), "" /* tenant */) + require.NoError(t, err) + connNoTxn, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) + connWithTxn, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) + connWithSleep, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) m := c.NewMonitor(ctx, c.Node(nodeToDrain)) m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("draining node %d", nodeToDrain)) return c.RunE(ctx, c.Node(nodeToDrain), - fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs", - totalWaitDuration.Seconds())) + "./cockroach node drain --self --insecure --drain-wait=600s", + ) }) + // The connection should work still. + var result int + err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result) + require.NoError(t, err) + require.Equal(t, 1, result) + + // A query that takes longer than the total wait duration should be canceled. + m.Go(func(ctx context.Context) error { + _, err := connWithSleep.Exec(ctx, "SELECT pg_sleep(200)") + if testutils.IsError(err, "(query execution canceled|server is shutting down|connection reset by peer|unexpected EOF)") { + return nil + } + if err == nil { + return errors.New("expected pg_sleep query to fail") + } + return errors.Wrapf(err, "expected pg_sleep query to fail; but got the wrong error") + }) + + // Start a txn before the query_wait period begins. + tx, err := connWithTxn.Begin(ctx) + require.NoError(t, err) + + time.Sleep(pokeDuringQueryWaitDelay) + t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new "+ + "queries on existing SQL connections", pokeDuringQueryWaitDelay)) + + // A connection with no open transaction should have been closed + // automatically by this point. + err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result) + if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") { + require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err) + } + + // A transaction that was already open should still work. + err = tx.QueryRow(ctx, "SELECT 2").Scan(&result) + require.NoError(t, err) + require.Equal(t, 2, result) + err = tx.Commit(ctx) + require.NoError(t, err) + // And that connection should be closed right after the commit happens. + _, err = connWithTxn.Exec(ctx, "SELECT 1") + if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") { + require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err) + } + err = m.WaitE() require.NoError(t, err, "error waiting for the draining to finish") diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index f20b6ed909d4..853ea614616c 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -598,10 +598,12 @@ func (c *conn) serveImpl( // If we're draining, let the client know by piling on an AdminShutdownError // and flushing the buffer. if draining() { - // TODO(andrei): I think sending this extra error to the client if we also - // sent another error for the last query (like a context canceled) is a bad - // idea; see #22630. I think we should find a way to return the - // AdminShutdown error as the only result of the query. + // The error here is also sent with pgcode.AdminShutdown, to indicate that + // the connection is being closed. Clients are expected to be able to handle + // this even when not waiting for a query result. See the discussion at + // https://github.com/cockroachdb/cockroach/issues/22630. + // NOTE: If a query is canceled due to draining, the conn_executor already + // will have sent a QueryCanceled error as a response to the query. log.Ops.Info(ctx, "closing existing connection while server is draining") _ /* err */ = c.writeErr(ctx, newAdminShutdownErr(ErrDrainingExistingConn), &c.writerState.buf) _ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn) diff --git a/pkg/sql/sqlerrors/errors.go b/pkg/sql/sqlerrors/errors.go index d308c3bce827..c3f8d04f379c 100644 --- a/pkg/sql/sqlerrors/errors.go +++ b/pkg/sql/sqlerrors/errors.go @@ -344,7 +344,7 @@ func NewInvalidVolatilityError(err error) error { var QueryTimeoutError = pgerror.New( pgcode.QueryCanceled, "query execution canceled due to statement timeout") -// TxnTimeoutError is an error representing a query timeout. +// TxnTimeoutError is an error representing a transasction timeout. var TxnTimeoutError = pgerror.New( pgcode.QueryCanceled, "query execution canceled due to transaction timeout")