Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: compute --drain-wait based on cluster setting values #98390

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,39 @@ func doDrain(
return doDrainNoTimeout(ctx, c, targetNode)
}

shutdownSettings, err := c.Settings(ctx, &serverpb.SettingsRequest{
Keys: []string{
"server.shutdown.drain_wait",
"server.shutdown.connection_wait",
"server.shutdown.query_wait",
"server.shutdown.lease_transfer_wait",
},
UnredactedValues: true,
})
if err != nil {
return false, true, err
}

// Add an extra buffer of 10 seconds for the timeout.
minWait := 10 * time.Second
for k, v := range shutdownSettings.KeyValues {
wait, err := time.ParseDuration(v.Value)
if err != nil {
return false, true, err
}
minWait += wait
// query_wait is used twice during draining, so count it twice here.
if k == "server.shutdown.query_wait" {
minWait += wait
}
}
if minWait > drainCtx.drainWait {
fmt.Fprintf(stderr, "warning: --drain-wait is %s, but the server.shutdown.{drain,query,connection,lease_transfer}_wait "+
"cluster settings require a value of at least %s; using the larger value\n",
drainCtx.drainWait, minWait)
drainCtx.drainWait = minWait
}

err = contextutil.RunWithTimeout(ctx, "drain", drainCtx.drainWait, func(ctx context.Context) (err error) {
hardError, remainingWork, err = doDrainNoTimeout(ctx, c, targetNode)
return err
Expand Down
179 changes: 141 additions & 38 deletions pkg/cmd/roachtest/tests/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import (
"context"
gosql "database/sql"
"fmt"
"io"
"math/rand"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -56,20 +62,28 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
var err error
const (
// Set the duration of each phase of the draining period.
drainWaitDuration = 5 * time.Second
drainWaitDuration = 10 * time.Second
connectionWaitDuration = 100 * time.Second
queryWaitDuration = 10 * time.Second
// pokeDuringConnWaitTimestamp is the timestamp after the server
// starts waiting for SQL connections to close (with the start of the whole
// draining process marked as timestamp 0). It should be set larger than
// drainWaitDuration, but smaller than (drainWaitDuration +
// connectionWaitDuration).
pokeDuringConnWaitTimestamp = 15 * time.Second
connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1

// server.shutdown.lease_transfer_wait defaults to 5 seconds.
leaseTransferWaitDuration = 5 * time.Second

// pokeDuringDrainWaitDelay is the amount of time after drain begins when we
// will check that the server is reporting itself as unhealthy to load
// balancers. It should be set smaller than drainWaitDuration.
pokeDuringDrainWaitDelay = 5 * time.Second

// pokeDuringConnWaitDelay is the amount of time after drain begins when we
// will check that the server is waiting for SQL connections to close. It
// should be set larger than drainWaitDuration, but smaller than
// (drainWaitDuration + connectionWaitDuration).
pokeDuringConnWaitDelay = 20 * time.Second

connMaxLifetime = 10 * time.Second
connMaxCount = 5
nodeToDrain = 1
)
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

Expand All @@ -84,9 +98,7 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
// Get two connections from the connection pools.
for j := 0; j < 2; j++ {
conn, err := db.Conn(ctx)

require.NoError(t, err, "failed to a SQL connection from the connection pool")

conns = append(conns, conn)
}

Expand All @@ -95,32 +107,71 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("start draining node %d", nodeToDrain))
return c.RunE(ctx,
results, err := c.RunWithDetailsSingleNode(
ctx,
t.L(),
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
// --drain-wait is set to a low value so that we can confirm that it
// gets automatically upgraded to use a higher value larger than the sum
// of server.shutdown.drain_wait, server.shutdown.connection_wait,
// server.shutdown.query_wait times two, and
// server.shutdown.lease_transfer_wait.
"./cockroach node drain --self --insecure --drain-wait=10s",
)
if err != nil {
return err
}

expectedDrain := drainWaitDuration + connectionWaitDuration + queryWaitDuration*2 + leaseTransferWaitDuration + 10*time.Second
if !strings.Contains(
results.Stderr,
fmt.Sprintf(
"cluster settings require a value of at least %s; using the larger value",
expectedDrain),
) {
return errors.Newf("expected --drain-wait to be upgraded to %s", expectedDrain)
}

return nil
})

drainStartTimestamp := timeutil.Now()

// Sleep till the server is in the status of reporting itself as unhealthy.
// Verify that the server is still allowing new SQL connections now.
time.Sleep(pokeDuringDrainWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, health check returns false", pokeDuringDrainWaitDelay))
conn, err := db.Conn(ctx)
require.NoError(t, err, "new SQL connection should be allowed during drain_wait")
err = conn.Close()
require.NoError(t, err)
addr, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(nodeToDrain))
require.NoError(t, err)
url := `http://` + addr[0] + `/health?ready=1`
resp, err := httputil.Get(ctx, url)
require.NoError(t, err)
defer func() { _ = resp.Body.Close() }()
require.Equalf(t, http.StatusServiceUnavailable, resp.StatusCode, "expected healthcheck to fail during drain")
bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(bodyBytes), "node is shutting down")

// Sleep till the server is in the status of waiting for users to close SQL
// connections. Verify that the server is rejecting new SQL connections now.
time.Sleep(pokeDuringConnWaitTimestamp)

time.Sleep(drainStartTimestamp.Add(pokeDuringConnWaitDelay).Sub(timeutil.Now()))
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new SQL connections", pokeDuringConnWaitDelay))
if _, err := db.Conn(ctx); err != nil {
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting "+
"new SQL connections: %v", pokeDuringConnWaitTimestamp, err))
require.ErrorContains(t, err, "server is not accepting clients, try another node")
} else {
t.Fatal(errors.New("new SQL connections should not be allowed when the server " +
"starts waiting for the user to close SQL connections"))
}

require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

t.Status("number of open connections: ", db.Stats().OpenConnections)
require.Equalf(t, db.Stats().OpenConnections, 2, "number of open connections should be 2")

randConn := conns[rand.Intn(len(conns))]
t.Status("execting sql query with connection %s", randConn)
t.Status("execing sql query with connection")

// When server is waiting clients to close connections, verify that SQL
// queries do not fail.
Expand All @@ -134,17 +185,18 @@ func runEarlyExitInConnectionWait(ctx context.Context, t test.Test, c cluster.Cl
"expected connection to be able to be successfully closed client-side")
}

t.Status("all SQL connections are put back to the connection pool")
t.Status("all SQL connections are closed")

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

drainEndTimestamp := timeutil.Now()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp).Seconds()
actualDrainDuration := drainEndTimestamp.Sub(drainStartTimestamp)

t.L().Printf("the draining lasted %f seconds", actualDrainDuration)

if actualDrainDuration >= float64(totalWaitDuration)-10 {
totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration
if actualDrainDuration >= totalWaitDuration-10*time.Second {
t.Fatal(errors.New("the draining process didn't early exit " +
"when waiting for server to close all SQL connections"))
}
Expand All @@ -160,30 +212,81 @@ func runTestWarningForConnWait(ctx context.Context, t test.Test, c cluster.Clust
drainWaitDuration = 0 * time.Second
connectionWaitDuration = 10 * time.Second
queryWaitDuration = 20 * time.Second
nodeToDrain = 1
// pokeDuringQueryWaitDelay is the amount of time after drain begins when we
// will check that the server does not allow any new query to begin on a
// connection that is still open. It
// should be set larger than (drainWaitDuration + connectionWaitDuration),
// but smaller than
// (drainWaitDuration + connectionWaitDuration + queryWaitDuration).
pokeDuringQueryWaitDelay = 15 * time.Second
nodeToDrain = 1
)

totalWaitDuration := drainWaitDuration + connectionWaitDuration + queryWaitDuration

prepareCluster(ctx, t, c, drainWaitDuration, connectionWaitDuration, queryWaitDuration)

db := c.Conn(ctx, t.L(), nodeToDrain)
defer db.Close()

// Get a connection from the connection pool.
_, err = db.Conn(ctx)

require.NoError(t, err, "cannot get a SQL connection from the connection pool")
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(nodeToDrain), "" /* tenant */)
require.NoError(t, err)
connNoTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithTxn, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)
connWithSleep, err := pgx.Connect(ctx, pgURL[0])
require.NoError(t, err)

m := c.NewMonitor(ctx, c.Node(nodeToDrain))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("draining node %d", nodeToDrain))
return c.RunE(ctx,
c.Node(nodeToDrain),
fmt.Sprintf("./cockroach node drain --insecure --drain-wait=%fs",
totalWaitDuration.Seconds()))
"./cockroach node drain --self --insecure --drain-wait=600s",
)
})

// The connection should work still.
var result int
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
require.NoError(t, err)
require.Equal(t, 1, result)

// A query that takes longer than the total wait duration should be canceled.
m.Go(func(ctx context.Context) error {
_, err := connWithSleep.Exec(ctx, "SELECT pg_sleep(200)")
if testutils.IsError(err, "(query execution canceled|server is shutting down|connection reset by peer|unexpected EOF)") {
return nil
}
if err == nil {
return errors.New("expected pg_sleep query to fail")
}
return errors.Wrapf(err, "expected pg_sleep query to fail; but got the wrong error")
})

// Start a txn before the query_wait period begins.
tx, err := connWithTxn.Begin(ctx)
require.NoError(t, err)

time.Sleep(pokeDuringQueryWaitDelay)
t.Status(fmt.Sprintf("%s after draining starts, server is rejecting new "+
"queries on existing SQL connections", pokeDuringQueryWaitDelay))

// A connection with no open transaction should have been closed
// automatically by this point.
err = connNoTxn.QueryRow(ctx, "SELECT 1").Scan(&result)
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

// A transaction that was already open should still work.
err = tx.QueryRow(ctx, "SELECT 2").Scan(&result)
require.NoError(t, err)
require.Equal(t, 2, result)
err = tx.Commit(ctx)
require.NoError(t, err)
// And that connection should be closed right after the commit happens.
_, err = connWithTxn.Exec(ctx, "SELECT 1")
if !testutils.IsError(err, "(server is shutting down|connection reset by peer|unexpected EOF)") {
require.FailNowf(t, "expected error from trying to use an idle connection", "the actual error was %v", err)
}

err = m.WaitE()
require.NoError(t, err, "error waiting for the draining to finish")

Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,7 @@ func (ex *connExecutor) run(

var err error
if err = ex.execCmd(); err != nil {
// Both of these errors are normal ways for the connExecutor to exit.
if errors.IsAny(err, io.EOF, errDrainingComplete) {
return nil
}
Expand All @@ -1934,7 +1935,7 @@ var errDrainingComplete = fmt.Errorf("draining done. this is a good time to fini
// Returns drainingComplete if the session should finish because draining is
// complete (i.e. we received a DrainRequest - possibly previously - and the
// connection is found to be idle).
func (ex *connExecutor) execCmd() error {
func (ex *connExecutor) execCmd() (retErr error) {
ctx := ex.Ctx()
cmd, pos, err := ex.stmtBuf.CurCmd()
if err != nil {
Expand Down Expand Up @@ -2121,16 +2122,16 @@ func (ex *connExecutor) execCmd() error {
// Note that the Sync result will flush results to the network connection.
res = ex.clientComm.CreateSyncResult(pos)
if ex.draining {
// If we're draining, check whether this is a good time to finish the
// If we're draining, then after handing the Sync connExecutor state
// transition, check whether this is a good time to finish the
// connection. If we're not inside a transaction, we stop processing
// now. If we are inside a transaction, we'll check again the next time
// a Sync is processed.
if ex.idleConn() {
// If we're about to close the connection, close res in order to flush
// now, as we won't have an opportunity to do it later.
res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState()))
return errDrainingComplete
}
defer func() {
if ex.idleConn() {
retErr = errDrainingComplete
}
}()
}
case CopyIn:
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionQueryReceived, tcmd.TimeReceived)
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,12 @@ func (c *conn) serveImpl(
// If we're draining, let the client know by piling on an AdminShutdownError
// and flushing the buffer.
if draining() {
// TODO(andrei): I think sending this extra error to the client if we also
// sent another error for the last query (like a context canceled) is a bad
// idea; see #22630. I think we should find a way to return the
// AdminShutdown error as the only result of the query.
// The error here is also sent with pgcode.AdminShutdown, to indicate that
// the connection is being closed. Clients are expected to be able to handle
// this even when not waiting for a query result. See the discussion at
// https://github.com/cockroachdb/cockroach/issues/22630.
// NOTE: If a query is canceled due to draining, the conn_executor already
// will have sent a QueryCanceled error as a response to the query.
log.Ops.Info(ctx, "closing existing connection while server is draining")
_ /* err */ = c.writeErr(ctx, newAdminShutdownErr(ErrDrainingExistingConn), &c.writerState.buf)
_ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func NewInvalidVolatilityError(err error) error {
var QueryTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to statement timeout")

// TxnTimeoutError is an error representing a query timeout.
// TxnTimeoutError is an error representing a transasction timeout.
var TxnTimeoutError = pgerror.New(
pgcode.QueryCanceled, "query execution canceled due to transaction timeout")

Expand Down