From d2f38303be7c75cf7e695d50be75a2632efa8962 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 10 Mar 2023 15:18:20 -0500 Subject: [PATCH] roachtest: enhance drain test The test now does much more: - Checks that --drain-wait is automatically increased if it is set lower than the cluster settings require. - Check that the /health?ready=1 endpoint fails during the drain_wait period. - Check for the proper error message during the connection_wait phase. - Check for the proper error message when trying to begin a new query/transaction during the query_wait phase. - Check that an open transaction is allowed to continue during the query_wait phase. - Check for the proper error message when a query is canceled during shutdown. Release note: None --- pkg/cli/rpc_node_shutdown.go | 4 +- pkg/cmd/roachtest/tests/drain.go | 179 ++++++++++++++++++++++++------- pkg/sql/pgwire/conn.go | 10 +- pkg/sql/sqlerrors/errors.go | 2 +- 4 files changed, 150 insertions(+), 45 deletions(-) diff --git a/pkg/cli/rpc_node_shutdown.go b/pkg/cli/rpc_node_shutdown.go index 96d5c910de93..e8cf99f4b88d 100644 --- a/pkg/cli/rpc_node_shutdown.go +++ b/pkg/cli/rpc_node_shutdown.go @@ -89,8 +89,8 @@ func doDrain( } } if minWait > drainCtx.drainWait { - log.Infof(ctx, "--drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ - "cluster settings require a value of at least %s; using the larger value", + fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+ + "cluster settings require a value of at least %s; using the larger value\n", drainCtx.drainWait, minWait) drainCtx.drainWait = minWait + 10*time.Second } diff --git a/pkg/cmd/roachtest/tests/drain.go b/pkg/cmd/roachtest/tests/drain.go index 265b8bad0ddc..106f2d098594 100644 --- a/pkg/cmd/roachtest/tests/drain.go +++ b/pkg/cmd/roachtest/tests/drain.go @@ -14,8 +14,11 @@ import ( "context" gosql "database/sql" "fmt" + "io" "math/rand" + "net/http" "path/filepath" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -23,8 +26,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -56,20 +62,28 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl var err error const ( // Set the duration of each phase of the draining period. - drainWaitDuration = 5 * time.Second + drainWaitDuration = 10 * time.Second connectionWaitDuration = 100 * time.Second queryWaitDuration = 10 * time.Second - // pokeDuringConnWaitTimestamp is the timestamp after the server - // starts waiting for SQL connections to close (with the start of the whole - // draining process marked as timestamp 0). It should be set larger than - // drainWaitDuration, but smaller than (drainWaitDuration + - // connectionWaitDuration). - pokeDuringConnWaitTimestamp = 15 * time.Second - connMaxLifetime = 10 * time.Second - connMaxCount = 5 - nodeToDrain = 1 + + // server.shutdown.lease_transfer_wait defaults to 5 seconds. + leaseTransferWaitDuration = 5 * time.Second + + // pokeDuringDrainWaitDelay is the amount of time after drain begins when we + // will check that the server is reporting itself as unhealthy to load + // balancers. It should be set smaller than drainWaitDuration. + pokeDuringDrainWaitDelay = 5 * time.Second + + // pokeDuringConnWaitDelay is the amount of time after drain begins when we + // will check that the server is waiting for SQL connections to close. It + // should be set larger than drainWaitDuration, but smaller than + // (drainWaitDuration + connectionWaitDuration). + pokeDuringConnWaitDelay = 20 * time.Second + + connMaxLifetime = 10 * time.Second + connMaxCount = 5 + nodeToDrain = 1 ) - totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration) @@ -84,9 +98,7 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl // Get two connections from the connection pools. for j := 0; j < 2; j++ { conn, err := db.Conn(ctx) - require.NoError(t, err, "failed to a SQL connection from the connection pool") - conns = append(conns, conn) } @@ -95,32 +107,71 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("start draining node %d", nodeToDrain)) - return c.RunE(ctx, + results, err := c.RunWithDetailsSingleNode( + ctx, + t.L(), c.Node(nodeToDrain), - fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs", - totalWaitDuration.Seconds())) + // --drain-wait is set to a low value so that we can confirm that it + // gets automatically upgraded to use a higher value larger than the sum + // of server.shutdown.drain_wait, server.shutdown.connection_wait, + // server.shutdown.query_wait times two, and + // server.shutdown.lease_transfer_wait. + "./cockroach node drain --self --insecure --drain-wait=10s", + ) + if err != nil { + return err + } + + expectedDrain := drainWaitDuration + connectionWaitDuration + queryWaitDuration*2 + leaseTransferWaitDuration + if !strings.Contains( + results.Stderr, + fmt.Sprintf( + "cluster settings require a value of at least %s; using the larger value", + expectedDrain), + ) { + return errors.Newf("expected --drain-wait to be upgraded to %s", expectedDrain) + } + + return nil }) drainStartTimestamp := timeutil.Now() + // Sleep till the server is in the status of reporting itself as unhealthy. + // Verify that the server is still allowing new SQL connections now. + time.Sleep(pokeDuringDrainWaitDelay) + t.Status(fmt.Sprintf("%s after draining starts, health check returns false", pokeDuringDrainWaitDelay)) + conn, err := db.Conn(ctx) + require.NoError(t, err, "new SQL connection should be allowed during drain_wait") + err = conn.Close() + require.NoError(t, err) + addr, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(nodeToDrain)) + require.NoError(t, err) + url := `http://` + addr[0] + `/health?ready=1` + resp, err := httputil.Get(ctx, url) + require.NoError(t, err) + defer func() { _ = resp.Body.Close() }() + require.Equalf(t, http.StatusServiceUnavailable, resp.StatusCode, "expected healthcheck to fail during drain") + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(bodyBytes), "node is shutting down") + // Sleep till the server is in the status of waiting for users to close SQL // connections. Verify that the server is rejecting new SQL connections now. - time.Sleep(pokeDuringConnWaitTimestamp) - + time.Sleep(drainStartTimestamp.Add(pokeDuringConnWaitDelay).Sub(timeutil.Now())) + t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new SQL connections", pokeDuringConnWaitDelay)) if _, err := db.Conn(ctx); err != nil { - t.Status(fmt.Sprintf("%s after draining starts, server is rejecting "+ - "new SQL connections: %v", pokeDuringConnWaitTimestamp, err)) + require.ErrorContains(t, err, "server is not accepting clients, try another node") } else { t.Fatal(errors.New("new SQL connections should not be allowed when the server " + "starts waiting for the user to close SQL connections")) } - require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2") - t.Status("number of open connections: ", db.Stats().OpenConnections) + require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2") randConn := conns[rand.Intn(len(conns))] - t.Status("execting sql query with connection %s", randConn) + t.Status("execing sql query with connection") // When server is waiting clients to close connections, verify that SQL // queries do not fail. @@ -134,17 +185,18 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl "expected connection to be able to be successfully closed client-side") } - t.Status("all SQL connections are put back to the connection pool") + t.Status("all SQL connections are closed") err = m.WaitE() require.NoError(t, err, "error waiting for the draining to finish") drainEndTimestamp := timeutil.Now() - actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp).Seconds() + actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp) t.L().Printf("the draining lasted %f seconds", actualDrainDuration) - if actualDrainDuration >= float64(totalWaitDuration)-10 { + totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration + if actualDrainDuration >= totalWaitDuration-10*time.Second { t.Fatal(errors.New("the draining process didn't early exit " + "when waiting for server to close all SQL connections")) } @@ -160,30 +212,81 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust drainWaitDuration = 0 * time.Second connectionWaitDuration = 10 * time.Second queryWaitDuration = 20 * time.Second - nodeToDrain = 1 + // pokeDuringQueryWaitDelay is the amount of time after drain begins when we + // will check that the server does not allow any new query to begin on a + // connection that is still open. It + // should be set larger than (drainWaitDuration + connectionWaitDuration), + // but smaller than + // (drainWaitDuration + connectionWaitDuration + queryWaitDuration). + pokeDuringQueryWaitDelay = 15 * time.Second + nodeToDrain = 1 ) - totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration - prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration) - db := c.Conn(ctx, t.L(), nodeToDrain) - defer db.Close() - - // Get a connection from the connection pool. - _, err = db.Conn(ctx) - - require.NoError(t, err, "cannot get a SQL connection from the connection pool") + pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(nodeToDrain), "" /* tenant */) + require.NoError(t, err) + connNoTxn, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) + connWithTxn, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) + connWithSleep, err := pgx.Connect(ctx, pgURL[0]) + require.NoError(t, err) m := c.NewMonitor(ctx, c.Node(nodeToDrain)) m.Go(func(ctx context.Context) error { t.Status(fmt.Sprintf("draining node %d", nodeToDrain)) return c.RunE(ctx, c.Node(nodeToDrain), - fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs", - totalWaitDuration.Seconds())) + "./cockroach node drain --self --insecure --drain-wait=600s", + ) }) + // The connection should work still. + var result int + err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result) + require.NoError(t, err) + require.Equal(t, 1, result) + + // A query that takes longer than the total wait duration should be canceled. + m.Go(func(ctx context.Context) error { + _, err := connWithSleep.Exec(ctx, "SELECT pg_sleep(200)") + if !testutils.IsError(err, "(query execution canceled|server is shutting down|connection reset by peer|unexpected EOF)") { + return nil + } + if err == nil { + return errors.New("expected pg_sleep query to fail") + } + return errors.Wrapf(err, "expected pg_sleep query to fail; but got the wrong error") + }) + + // Start a txn before the query_wait period begins. + tx, err := connWithTxn.Begin(ctx) + require.NoError(t, err) + + time.Sleep(pokeDuringQueryWaitDelay) + t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new "+ + "queries on existing SQL connections", pokeDuringQueryWaitDelay)) + + // A connection with no open transaction should have been closed + // automatically by this point. + err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result) + if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") { + require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err) + } + + // A transaction that was already open should still work. + err = tx.QueryRow(ctx, "SELECT 2").Scan(&result) + require.NoError(t, err) + require.Equal(t, 2, result) + err = tx.Commit(ctx) + require.NoError(t, err) + // And that connection should be closed right after the commit happens. + _, err = connWithTxn.Exec(ctx, "SELECT 1") + if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") { + require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err) + } + err = m.WaitE() require.NoError(t, err, "error waiting for the draining to finish") diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index f20b6ed909d4..853ea614616c 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -598,10 +598,12 @@ func (c *conn) serveImpl( // If we're draining, let the client know by piling on an AdminShutdownError // and flushing the buffer. if draining() { - // TODO(andrei): I think sending this extra error to the client if we also - // sent another error for the last query (like a context canceled) is a bad - // idea; see #22630. I think we should find a way to return the - // AdminShutdown error as the only result of the query. + // The error here is also sent with pgcode.AdminShutdown, to indicate that + // the connection is being closed. Clients are expected to be able to handle + // this even when not waiting for a query result. See the discussion at + // https://github.com/cockroachdb/cockroach/issues/22630. + // NOTE: If a query is canceled due to draining, the conn_executor already + // will have sent a QueryCanceled error as a response to the query. log.Ops.Info(ctx, "closing existing connection while server is draining") _ /* err */ = c.writeErr(ctx, newAdminShutdownErr(ErrDrainingExistingConn), &c.writerState.buf) _ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn) diff --git a/pkg/sql/sqlerrors/errors.go b/pkg/sql/sqlerrors/errors.go index d308c3bce827..c3f8d04f379c 100644 --- a/pkg/sql/sqlerrors/errors.go +++ b/pkg/sql/sqlerrors/errors.go @@ -344,7 +344,7 @@ func NewInvalidVolatilityError(err error) error { var QueryTimeoutError = pgerror.New( pgcode.QueryCanceled, "query execution canceled due to statement timeout") -// TxnTimeoutError is an error representing a query timeout. +// TxnTimeoutError is an error representing a transasction timeout. var TxnTimeoutError = pgerror.New( pgcode.QueryCanceled, "query execution canceled due to transaction timeout")