Skip to content

Commit

Permalink
roachtest: enhance drain test
Browse files Browse the repository at this point in the history
The test now does much more:
- Checks that --drain-wait is automatically increased if it is set lower
  than the cluster settings require.
- Check that the /health?ready=1 endpoint fails during the drain_wait
  period.
- Check for the proper error message during the connection_wait phase.
- Check for the proper error message when trying to begin a new
  query/transaction  during the query_wait phase.
- Check that an open transaction is allowed to continue during the
  query_wait phase.
- Check for the proper error message when a query is canceled during
  shutdown.

Release note: None
  • Loading branch information
rafiss committed Mar 13, 2023
1 parent b05859d commit 3274081
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func doDrain(
}
}
if minWait > drainCtx.drainWait {
log.Infof(ctx, "--drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value",
fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value\n",
drainCtx.drainWait, minWait)
drainCtx.drainWait = minWait
}
Expand Down
179 changes: 141 additions & 38 deletions pkg/cmd/roachtest/tests/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import (
"context"
gosql "database/sql"
"fmt"
"io"
"math/rand"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -56,20 +62,28 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
var err error
const (
// Set the duration of each phase of the draining period.
drainWaitDuration = 5 * time.Second
drainWaitDuration = 10 * time.Second
connectionWaitDuration = 100 * time.Second
queryWaitDuration = 10 * time.Second
// pokeDuringConnWaitTimestamp is the timestamp after the server
// starts waiting for SQL connections to close (with the start of the whole
// draining process marked as timestamp 0). It should be set larger than
// drainWaitDuration, but smaller than (drainWaitDuration +
// connectionWaitDuration).
pokeDuringConnWaitTimestamp = 15 * time.Second
connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1

// server.shutdown.lease_transfer_wait defaults to 5 seconds.
leaseTransferWaitDuration = 5 * time.Second

// pokeDuringDrainWaitDelay is the amount of time after drain begins when we
// will check that the server is reporting itself as unhealthy to load
// balancers. It should be set smaller than drainWaitDuration.
pokeDuringDrainWaitDelay = 5 * time.Second

// pokeDuringConnWaitDelay is the amount of time after drain begins when we
// will check that the server is waiting for SQL connections to close. It
// should be set larger than drainWaitDuration, but smaller than
// (drainWaitDuration + connectionWaitDuration).
pokeDuringConnWaitDelay = 20 * time.Second

connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1
)
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

Expand All @@ -84,9 +98,7 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
// Get two connections from the connection pools.
for j := 0; j < 2; j++ {
conn, err := db.Conn(ctx)

require.NoError(t, err, "failed to a SQL connection from the connection pool")

conns = append(conns, conn)
}

Expand All @@ -95,32 +107,71 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("start draining node %d", nodeToDrain))
return c.RunE(ctx,
results, err := c.RunWithDetailsSingleNode(
ctx,
t.L(),
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
// --drain-wait is set to a low value so that we can confirm that it
// gets automatically upgraded to use a higher value larger than the sum
// of server.shutdown.drain_wait, server.shutdown.connection_wait,
// server.shutdown.query_wait times two, and
// server.shutdown.lease_transfer_wait.
"./cockroach node drain --self --insecure --drain-wait=10s",
)
if err != nil {
return err
}

expectedDrain := drainWaitDuration + connectionWaitDuration + queryWaitDuration*2 + leaseTransferWaitDuration + 10*time.Second
if !strings.Contains(
results.Stderr,
fmt.Sprintf(
"cluster settings require a value of at least %s; using the larger value",
expectedDrain),
) {
return errors.Newf("expected --drain-wait to be upgraded to %s", expectedDrain)
}

return nil
})

drainStartTimestamp := timeutil.Now()

// Sleep till the server is in the status of reporting itself as unhealthy.
// Verify that the server is still allowing new SQL connections now.
time.Sleep(pokeDuringDrainWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, health check returns false", pokeDuringDrainWaitDelay))
conn, err := db.Conn(ctx)
require.NoError(t, err, "new SQL connection should be allowed during drain_wait")
err = conn.Close()
require.NoError(t, err)
addr, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(nodeToDrain))
require.NoError(t, err)
url := `http://` + addr[0] + `/health?ready=1`
resp, err := httputil.Get(ctx, url)
require.NoError(t, err)
defer func() { _ = resp.Body.Close() }()
require.Equalf(t, http.StatusServiceUnavailable, resp.StatusCode, "expected healthcheck to fail during drain")
bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(bodyBytes), "node is shutting down")

// Sleep till the server is in the status of waiting for users to close SQL
// connections. Verify that the server is rejecting new SQL connections now.
time.Sleep(pokeDuringConnWaitTimestamp)

time.Sleep(drainStartTimestamp.Add(pokeDuringConnWaitDelay).Sub(timeutil.Now()))
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new SQL connections", pokeDuringConnWaitDelay))
if _, err := db.Conn(ctx); err != nil {
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting "+
"new SQL connections: %v", pokeDuringConnWaitTimestamp, err))
require.ErrorContains(t, err, "server is not accepting clients, try another node")
} else {
t.Fatal(errors.New("new SQL connections should not be allowed when the server " +
"starts waiting for the user to close SQL connections"))
}

require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

t.Status("number of open connections: ", db.Stats().OpenConnections)
require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

randConn := conns[rand.Intn(len(conns))]
t.Status("execting sql query with connection %s", randConn)
t.Status("execing sql query with connection")

// When server is waiting clients to close connections, verify that SQL
// queries do not fail.
Expand All @@ -134,17 +185,18 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
"expected connection to be able to be successfully closed client-side")
}

t.Status("all SQL connections are put back to the connection pool")
t.Status("all SQL connections are closed")

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

drainEndTimestamp := timeutil.Now()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp).Seconds()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp)

t.L().Printf("the draining lasted %f seconds", actualDrainDuration)

if actualDrainDuration >= float64(totalWaitDuration)-10 {
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration
if actualDrainDuration >= totalWaitDuration-10*time.Second {
t.Fatal(errors.New("the draining process didn't early exit " +
"when waiting for server to close all SQL connections"))
}
Expand All @@ -160,30 +212,81 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust
drainWaitDuration = 0 * time.Second
connectionWaitDuration = 10 * time.Second
queryWaitDuration = 20 * time.Second
nodeToDrain = 1
// pokeDuringQueryWaitDelay is the amount of time after drain begins when we
// will check that the server does not allow any new query to begin on a
// connection that is still open. It
// should be set larger than (drainWaitDuration + connectionWaitDuration),
// but smaller than
// (drainWaitDuration + connectionWaitDuration + queryWaitDuration).
pokeDuringQueryWaitDelay = 15 * time.Second
nodeToDrain = 1
)

totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

db := c.Conn(ctx, t.L(), nodeToDrain)
defer db.Close()

// Get a connection from the connection pool.
_, err = db.Conn(ctx)

require.NoError(t, err, "cannot get a SQL connection from the connection pool")
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(nodeToDrain), "" /* tenant */)
require.NoError(t, err)
connNoTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithSleep, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)

m := c.NewMonitor(ctx, c.Node(nodeToDrain))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("draining node %d", nodeToDrain))
return c.RunE(ctx,
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
"./cockroach node drain --self --insecure --drain-wait=600s",
)
})

// The connection should work still.
var result int
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
require.NoError(t, err)
require.Equal(t, 1, result)

// A query that takes longer than the total wait duration should be canceled.
m.Go(func(ctx context.Context) error {
_, err := connWithSleep.Exec(ctx, "SELECT pg_sleep(200)")
if testutils.IsError(err, "(query execution canceled|server is shutting down|connection reset by peer|unexpected EOF)") {
return nil
}
if err == nil {
return errors.New("expected pg_sleep query to fail")
}
return errors.Wrapf(err, "expected pg_sleep query to fail; but got the wrong error")
})

// Start a txn before the query_wait period begins.
tx, err := connWithTxn.Begin(ctx)
require.NoError(t, err)

time.Sleep(pokeDuringQueryWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new "+
"queries on existing SQL connections", pokeDuringQueryWaitDelay))

// A connection with no open transaction should have been closed
// automatically by this point.
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

// A transaction that was already open should still work.
err = tx.QueryRow(ctx, "SELECT 2").Scan(&result)
require.NoError(t, err)
require.Equal(t, 2, result)
err = tx.Commit(ctx)
require.NoError(t, err)
// And that connection should be closed right after the commit happens.
_, err = connWithTxn.Exec(ctx, "SELECT 1")
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,12 @@ func (c *conn) serveImpl(
// If we're draining, let the client know by piling on an AdminShutdownError
// and flushing the buffer.
if draining() {
// TODO(andrei): I think sending this extra error to the client if we also
// sent another error for the last query (like a context canceled) is a bad
// idea; see #22630. I think we should find a way to return the
// AdminShutdown error as the only result of the query.
// The error here is also sent with pgcode.AdminShutdown, to indicate that
// the connection is being closed. Clients are expected to be able to handle
// this even when not waiting for a query result. See the discussion at
// https://github.com/cockroachdb/cockroach/issues/22630.
// NOTE: If a query is canceled due to draining, the conn_executor already
// will have sent a QueryCanceled error as a response to the query.
log.Ops.Info(ctx, "closing existing connection while server is draining")
_ /* err */ = c.writeErr(ctx, newAdminShutdownErr(ErrDrainingExistingConn), &c.writerState.buf)
_ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func NewInvalidVolatilityError(err error) error {
var QueryTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to statement timeout")

// TxnTimeoutError is an error representing a query timeout.
// TxnTimeoutError is an error representing a transasction timeout.
var TxnTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to transaction timeout")

Expand Down

0 comments on commit 3274081

Please sign in to comment.