Skip to content

Commit

Permalink
Merge #98390
Browse files Browse the repository at this point in the history
98390: cli: compute --drain-wait based on cluster setting values r=rafiss a=rafiss

This includes a few commits related to draining

---

### cli: compute --drain-wait based on cluster setting values

fixes #98388

Release note (cli change): The --drain-wait argument for the `drain` command will be automatically increased if the command detects that it is smaller than the sum of server.shutdown.drain_wait, server.shutdown.connection_wait, server.shutdown.query_wait times two, and server.shutdown.lease_transfer_wait.

This recommendation was already documented, but now the advice will be applied automatically.

---

### sql: fix check for closing connExecutor during draining

This fixes a minor bug in which the connection would not get closed at
the earliest possible time during server shutdown.

The connection is supposed to be closed as soon as we handle a Sync
message when the conn_executor is in the draining state and not in a
transaction. Since the transaction state was checked before state
transitions occurred, this would cause the connection to remain open for
an extra bit of time.

---

### roachtest: enhance drain test

The test now does much more:
- Checks that --drain-wait is automatically increased if it is set lower
  than the cluster settings require.
- Check that the /health?ready=1 endpoint fails during the drain_wait
  period.
- Check for the proper error message during the connection_wait phase.
- Check for the proper error message when trying to begin a new
  query/transaction  during the query_wait phase.
- Check that an open transaction is allowed to continue during the
  query_wait phase.
- Check for the proper error message when a query is canceled during
  shutdown.

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Mar 14, 2023
2 parents 0c6ccaf + d6cdd94 commit 19e5845
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 51 deletions.
33 changes: 33 additions & 0 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,39 @@ func doDrain(
return doDrainNoTimeout(ctx, c, targetNode)
}

shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{
Keys: []string{
"server.shutdown.drain_wait",
"server.shutdown.connection_wait",
"server.shutdown.query_wait",
"server.shutdown.lease_transfer_wait",
},
UnredactedValues: true,
})
if err != nil {
return false, true, err
}

// Add an extra buffer of 10 seconds for the timeout.
minWait := 10 * time.Second
for k, v := range shutdownSettings.KeyValues {
wait, err := time.ParseDuration(v.Value)
if err != nil {
return false, true, err
}
minWait += wait
// query_wait is used twice during draining, so count it twice here.
if k == "server.shutdown.query_wait" {
minWait += wait
}
}
if minWait > drainCtx.drainWait {
fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value\n",
drainCtx.drainWait, minWait)
drainCtx.drainWait = minWait
}

err = contextutil.RunWithTimeout(ctx, "drain", drainCtx.drainWait, func(ctx context.Context) (err error) {
hardError, remainingWork, err = doDrainNoTimeout(ctx, c, targetNode)
return err
Expand Down
179 changes: 141 additions & 38 deletions pkg/cmd/roachtest/tests/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import (
"context"
gosql "database/sql"
"fmt"
"io"
"math/rand"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -56,20 +62,28 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
var err error
const (
// Set the duration of each phase of the draining period.
drainWaitDuration = 5 * time.Second
drainWaitDuration = 10 * time.Second
connectionWaitDuration = 100 * time.Second
queryWaitDuration = 10 * time.Second
// pokeDuringConnWaitTimestamp is the timestamp after the server
// starts waiting for SQL connections to close (with the start of the whole
// draining process marked as timestamp 0). It should be set larger than
// drainWaitDuration, but smaller than (drainWaitDuration +
// connectionWaitDuration).
pokeDuringConnWaitTimestamp = 15 * time.Second
connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1

// server.shutdown.lease_transfer_wait defaults to 5 seconds.
leaseTransferWaitDuration = 5 * time.Second

// pokeDuringDrainWaitDelay is the amount of time after drain begins when we
// will check that the server is reporting itself as unhealthy to load
// balancers. It should be set smaller than drainWaitDuration.
pokeDuringDrainWaitDelay = 5 * time.Second

// pokeDuringConnWaitDelay is the amount of time after drain begins when we
// will check that the server is waiting for SQL connections to close. It
// should be set larger than drainWaitDuration, but smaller than
// (drainWaitDuration + connectionWaitDuration).
pokeDuringConnWaitDelay = 20 * time.Second

connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1
)
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

Expand All @@ -84,9 +98,7 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
// Get two connections from the connection pools.
for j := 0; j < 2; j++ {
conn, err := db.Conn(ctx)

require.NoError(t, err, "failed to a SQL connection from the connection pool")

conns = append(conns, conn)
}

Expand All @@ -95,32 +107,71 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("start draining node %d", nodeToDrain))
return c.RunE(ctx,
results, err := c.RunWithDetailsSingleNode(
ctx,
t.L(),
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
// --drain-wait is set to a low value so that we can confirm that it
// gets automatically upgraded to use a higher value larger than the sum
// of server.shutdown.drain_wait, server.shutdown.connection_wait,
// server.shutdown.query_wait times two, and
// server.shutdown.lease_transfer_wait.
"./cockroach node drain --self --insecure --drain-wait=10s",
)
if err != nil {
return err
}

expectedDrain := drainWaitDuration + connectionWaitDuration + queryWaitDuration*2 + leaseTransferWaitDuration + 10*time.Second
if !strings.Contains(
results.Stderr,
fmt.Sprintf(
"cluster settings require a value of at least %s; using the larger value",
expectedDrain),
) {
return errors.Newf("expected --drain-wait to be upgraded to %s", expectedDrain)
}

return nil
})

drainStartTimestamp := timeutil.Now()

// Sleep till the server is in the status of reporting itself as unhealthy.
// Verify that the server is still allowing new SQL connections now.
time.Sleep(pokeDuringDrainWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, health check returns false", pokeDuringDrainWaitDelay))
conn, err := db.Conn(ctx)
require.NoError(t, err, "new SQL connection should be allowed during drain_wait")
err = conn.Close()
require.NoError(t, err)
addr, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(nodeToDrain))
require.NoError(t, err)
url := `http://` + addr[0] + `/health?ready=1`
resp, err := httputil.Get(ctx, url)
require.NoError(t, err)
defer func() { _ = resp.Body.Close() }()
require.Equalf(t, http.StatusServiceUnavailable, resp.StatusCode, "expected healthcheck to fail during drain")
bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(bodyBytes), "node is shutting down")

// Sleep till the server is in the status of waiting for users to close SQL
// connections. Verify that the server is rejecting new SQL connections now.
time.Sleep(pokeDuringConnWaitTimestamp)

time.Sleep(drainStartTimestamp.Add(pokeDuringConnWaitDelay).Sub(timeutil.Now()))
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new SQL connections", pokeDuringConnWaitDelay))
if _, err := db.Conn(ctx); err != nil {
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting "+
"new SQL connections: %v", pokeDuringConnWaitTimestamp, err))
require.ErrorContains(t, err, "server is not accepting clients, try another node")
} else {
t.Fatal(errors.New("new SQL connections should not be allowed when the server " +
"starts waiting for the user to close SQL connections"))
}

require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

t.Status("number of open connections: ", db.Stats().OpenConnections)
require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

randConn := conns[rand.Intn(len(conns))]
t.Status("execting sql query with connection %s", randConn)
t.Status("execing sql query with connection")

// When server is waiting clients to close connections, verify that SQL
// queries do not fail.
Expand All @@ -134,17 +185,18 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
"expected connection to be able to be successfully closed client-side")
}

t.Status("all SQL connections are put back to the connection pool")
t.Status("all SQL connections are closed")

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

drainEndTimestamp := timeutil.Now()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp).Seconds()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp)

t.L().Printf("the draining lasted %f seconds", actualDrainDuration)

if actualDrainDuration >= float64(totalWaitDuration)-10 {
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration
if actualDrainDuration >= totalWaitDuration-10*time.Second {
t.Fatal(errors.New("the draining process didn't early exit " +
"when waiting for server to close all SQL connections"))
}
Expand All @@ -160,30 +212,81 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust
drainWaitDuration = 0 * time.Second
connectionWaitDuration = 10 * time.Second
queryWaitDuration = 20 * time.Second
nodeToDrain = 1
// pokeDuringQueryWaitDelay is the amount of time after drain begins when we
// will check that the server does not allow any new query to begin on a
// connection that is still open. It
// should be set larger than (drainWaitDuration + connectionWaitDuration),
// but smaller than
// (drainWaitDuration + connectionWaitDuration + queryWaitDuration).
pokeDuringQueryWaitDelay = 15 * time.Second
nodeToDrain = 1
)

totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

db := c.Conn(ctx, t.L(), nodeToDrain)
defer db.Close()

// Get a connection from the connection pool.
_, err = db.Conn(ctx)

require.NoError(t, err, "cannot get a SQL connection from the connection pool")
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(nodeToDrain), "" /* tenant */)
require.NoError(t, err)
connNoTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithSleep, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)

m := c.NewMonitor(ctx, c.Node(nodeToDrain))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("draining node %d", nodeToDrain))
return c.RunE(ctx,
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
"./cockroach node drain --self --insecure --drain-wait=600s",
)
})

// The connection should work still.
var result int
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
require.NoError(t, err)
require.Equal(t, 1, result)

// A query that takes longer than the total wait duration should be canceled.
m.Go(func(ctx context.Context) error {
_, err := connWithSleep.Exec(ctx, "SELECT pg_sleep(200)")
if testutils.IsError(err, "(query execution canceled|server is shutting down|connection reset by peer|unexpected EOF)") {
return nil
}
if err == nil {
return errors.New("expected pg_sleep query to fail")
}
return errors.Wrapf(err, "expected pg_sleep query to fail; but got the wrong error")
})

// Start a txn before the query_wait period begins.
tx, err := connWithTxn.Begin(ctx)
require.NoError(t, err)

time.Sleep(pokeDuringQueryWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new "+
"queries on existing SQL connections", pokeDuringQueryWaitDelay))

// A connection with no open transaction should have been closed
// automatically by this point.
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

// A transaction that was already open should still work.
err = tx.QueryRow(ctx, "SELECT 2").Scan(&result)
require.NoError(t, err)
require.Equal(t, 2, result)
err = tx.Commit(ctx)
require.NoError(t, err)
// And that connection should be closed right after the commit happens.
_, err = connWithTxn.Exec(ctx, "SELECT 1")
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,7 @@ func (ex *connExecutor) run(

var err error
if err = ex.execCmd(); err != nil {
// Both of these errors are normal ways for the connExecutor to exit.
if errors.IsAny(err, io.EOF, errDrainingComplete) {
return nil
}
Expand All @@ -1934,7 +1935,7 @@ var errDrainingComplete = fmt.Errorf("draining done. this is a good time to fini
// Returns drainingComplete if the session should finish because draining is
// complete (i.e. we received a DrainRequest - possibly previously - and the
// connection is found to be idle).
func (ex *connExecutor) execCmd() error {
func (ex *connExecutor) execCmd() (retErr error) {
ctx := ex.Ctx()
cmd, pos, err := ex.stmtBuf.CurCmd()
if err != nil {
Expand Down Expand Up @@ -2121,16 +2122,16 @@ func (ex *connExecutor) execCmd() error {
// Note that the Sync result will flush results to the network connection.
res = ex.clientComm.CreateSyncResult(pos)
if ex.draining {
// If we're draining, check whether this is a good time to finish the
// If we're draining, then after handing the Sync connExecutor state
// transition, check whether this is a good time to finish the
// connection. If we're not inside a transaction, we stop processing
// now. If we are inside a transaction, we'll check again the next time
// a Sync is processed.
if ex.idleConn() {
// If we're about to close the connection, close res in order to flush
// now, as we won't have an opportunity to do it later.
res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState()))
return errDrainingComplete
}
defer func() {
if ex.idleConn() {
retErr = errDrainingComplete
}
}()
}
case CopyIn:
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryReceived, tcmd.TimeReceived)
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,12 @@ func (c *conn) serveImpl(
// If we're draining, let the client know by piling on an AdminShutdownError
// and flushing the buffer.
if draining() {
// TODO(andrei): I think sending this extra error to the client if we also
// sent another error for the last query (like a context canceled) is a bad
// idea; see #22630. I think we should find a way to return the
// AdminShutdown error as the only result of the query.
// The error here is also sent with pgcode.AdminShutdown, to indicate that
// the connection is being closed. Clients are expected to be able to handle
// this even when not waiting for a query result. See the discussion at
// https://github.com/cockroachdb/cockroach/issues/22630.
// NOTE: If a query is canceled due to draining, the conn_executor already
// will have sent a QueryCanceled error as a response to the query.
log.Ops.Info(ctx, "closing existing connection while server is draining")
_ /* err */ = c.writeErr(ctx, newAdminShutdownErr(ErrDrainingExistingConn), &c.writerState.buf)
_ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func NewInvalidVolatilityError(err error) error {
var QueryTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to statement timeout")

// TxnTimeoutError is an error representing a query timeout.
// TxnTimeoutError is an error representing a transasction timeout.
var TxnTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to transaction timeout")

Expand Down

0 comments on commit 19e5845

Please sign in to comment.