Skip to content

Commit

Permalink
logictest: change logicTest to store per-node clients for users
Browse files Browse the repository at this point in the history
This patch changes the `clients` field within `logicTest` from type
`map[string]*gosql.DB` to `map[string]map[int]*gosql.DB` in order to
store per-node clients for each user. This allows the `user` cmd in
logic tests to fetch a client for the specified node instead of the
previously used node.

Release note: None
  • Loading branch information
andyyang890 committed Feb 2, 2023
1 parent 22244a7 commit 9470850
Showing 1 changed file with 120 additions and 104 deletions.
224 changes: 120 additions & 104 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ import (
// Changes the user for subsequent statements or queries.
// If nodeidx is specified, this user will connect to the node
// in the cluster with index N (note this is 0-indexed, while
// node IDs themselves are 1-indexed).
// node IDs themselves are 1-indexed). Otherwise, it will connect
// to the node with index 0 (node ID 1).
//
// - upgrade N
// When using a cockroach-go/testserver logictest, upgrades the node at
Expand Down Expand Up @@ -982,9 +983,10 @@ type logicTest struct {
// If this test uses a SQL tenant server, this is its address. In this case,
// all clients are created against this tenant.
tenantAddrs []string
// map of built clients. Needs to be persisted so that we can
// re-use them and close them all on exit.
clients map[string]*gosql.DB
// map of built clients, keyed first on username and then node idx.
// They are persisted so that they can be reused. They are not closed
// until the end of a test.
clients map[string]map[int]*gosql.DB
// client currently in use. This can change during processing
// of a test input file when encountering the "user" directive.
// see setUser() for details.
Expand Down Expand Up @@ -1149,26 +1151,19 @@ func (t *logicTest) outf(format string, args ...interface{}) {
fmt.Printf("[%s] %s\n", now, msg)
}

// setUser sets the DB client to the specified user.
// setUser sets the DB client to the specified user and connects
// to the node in the cluster at index nodeIdx.
// It returns a cleanup function to be run when the credentials
// are no longer needed.
func (t *logicTest) setUser(user string, nodeIdxOverride int) func() {
if t.clients == nil {
t.clients = map[string]*gosql.DB{}
}
if db, ok := t.clients[user]; ok {
func (t *logicTest) setUser(user string, nodeIdx int) func() {
if db, ok := t.clients[user][nodeIdx]; ok {
t.db = db
t.user = user

// No cleanup necessary, but return a no-op func to avoid nil pointer dereference.
return func() {}
}

nodeIdx := t.nodeIdx
if nodeIdxOverride > 0 {
nodeIdx = nodeIdxOverride
}

var addr string
var pgURL url.URL
var pgUser string
Expand Down Expand Up @@ -1202,9 +1197,16 @@ func (t *logicTest) setUser(user string, nodeIdxOverride int) func() {
if _, err := db.Exec("SET index_recommendations_enabled = false"); err != nil {
t.Fatal(err)
}
t.clients[user] = db
if t.clients == nil {
t.clients = make(map[string]map[int]*gosql.DB)
}
if t.clients[user] == nil {
t.clients[user] = make(map[int]*gosql.DB)
}
t.clients[user][nodeIdx] = db
t.db = db
t.user = pgUser
t.nodeIdx = nodeIdx

return cleanupFunc
}
Expand Down Expand Up @@ -1295,7 +1297,7 @@ func (t *logicTest) newTestServerCluster(bootstrapBinaryPath string, upgradeBina
}

t.testserverCluster = ts
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdxOverride */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdx */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, ts.Stop)
}

Expand Down Expand Up @@ -1712,7 +1714,7 @@ func (t *logicTest) newCluster(

// db may change over the lifetime of this function, with intermediate
// values cached in t.clients and finally closed in t.close().
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdxOverride */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdx */))
}

// waitForTenantReadOnlyClusterSettingToTakeEffectOrFatal waits until all tenant
Expand Down Expand Up @@ -1765,8 +1767,10 @@ func (t *logicTest) shutdownCluster() {
t.cluster = nil
}
if t.clients != nil {
for _, c := range t.clients {
c.Close()
for _, userClients := range t.clients {
for _, c := range userClients {
c.Close()
}
}
t.clients = nil
}
Expand Down Expand Up @@ -2164,22 +2168,24 @@ func (t *logicTest) processTestFile(path string, config logictestbase.TestCluste
}

func (t *logicTest) hasOpenTxns(ctx context.Context) bool {
for _, user := range t.clients {
existingTxnPriority := "NORMAL"
err := user.QueryRow("SHOW TRANSACTION PRIORITY").Scan(&existingTxnPriority)
if err != nil {
// If we are unable to see transaction priority assume we're in the middle
// of an explicit txn.
log.Warningf(ctx, "failed to check txn priority with %v", err)
return true
}
if _, err := user.Exec("SET TRANSACTION PRIORITY NORMAL;"); !testutils.IsError(err, "there is no transaction in progress") {
// Reset the txn priority to what it was before we checked for open txns.
_, err := user.Exec(fmt.Sprintf(`SET TRANSACTION PRIORITY %s`, existingTxnPriority))
for _, userClients := range t.clients {
for _, user := range userClients {
existingTxnPriority := "NORMAL"
err := user.QueryRow("SHOW TRANSACTION PRIORITY").Scan(&existingTxnPriority)
if err != nil {
log.Warningf(ctx, "failed to reset txn priority with %v", err)
// If we are unable to see transaction priority assume we're in the middle
// of an explicit txn.
log.Warningf(ctx, "failed to check txn priority with %v", err)
return true
}
if _, err := user.Exec("SET TRANSACTION PRIORITY NORMAL;"); !testutils.IsError(err, "there is no transaction in progress") {
// Reset the txn priority to what it was before we checked for open txns.
_, err := user.Exec(fmt.Sprintf(`SET TRANSACTION PRIORITY %s`, existingTxnPriority))
if err != nil {
log.Warningf(ctx, "failed to reset txn priority with %v", err)
}
return true
}
return true
}
}
return false
Expand Down Expand Up @@ -2213,8 +2219,9 @@ func (t *logicTest) maybeBackupRestore(
}

oldUser := t.user
oldNodeIdx := t.nodeIdx
defer func() {
t.setUser(oldUser, 0 /* nodeIdxOverride */)
t.setUser(oldUser, oldNodeIdx)
}()

log.Info(context.Background(), "Running cluster backup and restore")
Expand All @@ -2226,52 +2233,56 @@ func (t *logicTest) maybeBackupRestore(
// TODO(adityamaru): A better approach might be to wipe the cluster once we
// have a command that enables this. That way all of the session data will not
// be lost in the process of creating a new cluster.
users := make([]string, 0, len(t.clients))
userToHexSession := make(map[string]string, len(t.clients))
userToSessionVars := make(map[string]map[string]string, len(t.clients))
for user := range t.clients {
t.setUser(user, 0 /* nodeIdxOverride */)
users = append(users, user)

// Serialize session variables.
var userSession string
var err error
if err = t.db.QueryRow(`SELECT encode(crdb_internal.serialize_session(), 'hex')`).Scan(&userSession); err == nil {
userToHexSession[user] = userSession
continue
}
log.Warningf(context.Background(), "failed to serialize session: %+v", err)

// If we failed to serialize the session variables, lets save the output of
// `SHOW ALL`. This usually happens if the session contains prepared
// statements or portals that cause the `serialize_session()` to fail.
//
// Saving the session variables in this manner does not guarantee the test
// will succeed since there are no ordering semantics when we go to apply
// them. There are some session variables that need to be applied before
// others for them to be valid. Thus, it is strictly better to use
// `serialize/deserialize_session()`, this "hack" just gives the test one
// more chance to succeed.
userSessionVars := make(map[string]string)
existingSessionVars, err := t.db.Query("SHOW ALL")
if err != nil {
return err
}
for existingSessionVars.Next() {
var key, value string
if err := existingSessionVars.Scan(&key, &value); err != nil {
return errors.Wrap(err, "scanning session variables")
users := make(map[string][]int, len(t.clients))
userToHexSession := make(map[string]map[int]string, len(t.clients))
userToSessionVars := make(map[string]map[int]map[string]string, len(t.clients))
for user, userClients := range t.clients {
userToHexSession[user] = make(map[int]string)
userToSessionVars[user] = make(map[int]map[string]string)
for nodeIdx := range userClients {
users[user] = append(users[user], nodeIdx)
t.setUser(user, nodeIdx)

// Serialize session variables.
var userSession string
var err error
if err = t.db.QueryRow(`SELECT encode(crdb_internal.serialize_session(), 'hex')`).Scan(&userSession); err == nil {
userToHexSession[user][nodeIdx] = userSession
continue
}
log.Warningf(context.Background(), "failed to serialize session: %+v", err)

// If we failed to serialize the session variables, lets save the output of
// `SHOW ALL`. This usually happens if the session contains prepared
// statements or portals that cause the `serialize_session()` to fail.
//
// Saving the session variables in this manner does not guarantee the test
// will succeed since there are no ordering semantics when we go to apply
// them. There are some session variables that need to be applied before
// others for them to be valid. Thus, it is strictly better to use
// `serialize/deserialize_session()`, this "hack" just gives the test one
// more chance to succeed.
userSessionVars := make(map[string]string)
existingSessionVars, err := t.db.Query("SHOW ALL")
if err != nil {
return err
}
userSessionVars[key] = value
for existingSessionVars.Next() {
var key, value string
if err := existingSessionVars.Scan(&key, &value); err != nil {
return errors.Wrap(err, "scanning session variables")
}
userSessionVars[key] = value
}
userToSessionVars[user][nodeIdx] = userSessionVars
}
userToSessionVars[user] = userSessionVars
}

backupLocation := fmt.Sprintf("gs://cockroachdb-backup-testing/logic-test-backup-restore-nightly/%s?AUTH=implicit",
strconv.FormatInt(timeutil.Now().UnixNano(), 10))

// Perform the backup and restore as root.
t.setUser(username.RootUser, 0 /* nodeIdxOverride */)
t.setUser(username.RootUser, 0 /* nodeIdx */)

if _, err := t.db.Exec(fmt.Sprintf("BACKUP INTO '%s'", backupLocation)); err != nil {
return errors.Wrap(err, "backing up cluster")
Expand All @@ -2282,7 +2293,7 @@ func (t *logicTest) maybeBackupRestore(
t.resetCluster()

// Run the restore as root.
t.setUser(username.RootUser, 0 /* nodeIdxOverride */)
t.setUser(username.RootUser, 0 /* nodeIdx */)
if _, err := t.db.Exec(fmt.Sprintf("RESTORE FROM LATEST IN '%s'", backupLocation)); err != nil {
return errors.Wrap(err, "restoring cluster")
}
Expand All @@ -2291,29 +2302,31 @@ func (t *logicTest) maybeBackupRestore(

// Create new connections for the existing users, and restore the session
// variables that we collected.
for _, user := range users {
// Call setUser for every user to create the connection for that user.
t.setUser(user, 0 /* nodeIdxOverride */)

if userSession, ok := userToHexSession[user]; ok {
if _, err := t.db.Exec(fmt.Sprintf(`SELECT crdb_internal.deserialize_session(decode('%s', 'hex'))`, userSession)); err != nil {
return errors.Wrapf(err, "deserializing session")
}
} else if vars, ok := userToSessionVars[user]; ok {
// We now attempt to restore the session variables that were set on the
// backing up cluster. These are not included in the backup restore and so
// have to be restored manually.
for key, value := range vars {
// First try setting the cluster setting as a string.
if _, err := t.db.Exec(fmt.Sprintf("SET %s='%s'", key, value)); err != nil {
// If it fails, try setting the value as an int.
log.Infof(context.Background(), "setting session variable as string failed (err: %v), trying as int", pretty.Formatter(err))
if _, err := t.db.Exec(fmt.Sprintf("SET %s=%s", key, value)); err != nil {
// Some cluster settings can't be set at all, so ignore these errors.
// If a setting that we needed could not be restored, we expect the
// logic test to fail and let us know.
log.Infof(context.Background(), "setting session variable as int failed: %v (continuing anyway)", pretty.Formatter(err))
continue
for user, userNodeIdxs := range users {
for _, nodeIdx := range userNodeIdxs {
// Call setUser for every user to create the connection for that user.
t.setUser(user, nodeIdx)

if userSession, ok := userToHexSession[user][nodeIdx]; ok {
if _, err := t.db.Exec(fmt.Sprintf(`SELECT crdb_internal.deserialize_session(decode('%s', 'hex'))`, userSession)); err != nil {
return errors.Wrapf(err, "deserializing session")
}
} else if vars, ok := userToSessionVars[user][nodeIdx]; ok {
// We now attempt to restore the session variables that were set on the
// backing up cluster. These are not included in the backup restore and so
// have to be restored manually.
for key, value := range vars {
// First try setting the cluster setting as a string.
if _, err := t.db.Exec(fmt.Sprintf("SET %s='%s'", key, value)); err != nil {
// If it fails, try setting the value as an int.
log.Infof(context.Background(), "setting session variable as string failed (err: %v), trying as int", pretty.Formatter(err))
if _, err := t.db.Exec(fmt.Sprintf("SET %s=%s", key, value)); err != nil {
// Some cluster settings can't be set at all, so ignore these errors.
// If a setting that we needed could not be restored, we expect the
// logic test to fail and let us know.
log.Infof(context.Background(), "setting session variable as int failed: %v (continuing anyway)", pretty.Formatter(err))
continue
}
}
}
}
Expand Down Expand Up @@ -2547,6 +2560,7 @@ func (t *logicTest) processSubtest(
case "query":
var query logicQuery
query.pos = fmt.Sprintf("\n%s:%d", path, s.Line+subtest.lineLineIndexIntoFile)
query.nodeIdx = t.nodeIdx
// Parse "query error <regexp>"
if m := errorRE.FindStringSubmatch(s.Text()); m != nil {
query.expectErrCode = m[1]
Expand Down Expand Up @@ -3270,7 +3284,7 @@ func (t *logicTest) execQuery(query logicQuery) error {

db := t.db
var closeDB func()
if query.nodeIdx != 0 {
if query.nodeIdx != t.nodeIdx {
var pgURL url.URL
if t.testserverCluster != nil {
pgURL = *t.testserverCluster.PGURLForNode(query.nodeIdx)
Expand Down Expand Up @@ -3804,16 +3818,18 @@ func (t *logicTest) validateAfterTestCompletion() error {
}

// Close all clients other than "root"
for username, c := range t.clients {
if username == "root" {
for user, userClients := range t.clients {
if user == username.RootUser {
continue
}
delete(t.clients, username)
if err := c.Close(); err != nil {
t.Fatalf("failed to close connection for user %s: %v", username, err)
for i, c := range userClients {
if err := c.Close(); err != nil {
t.Fatalf("failed to close connection to node %d for user %s: %v", i, user, err)
}
}
delete(t.clients, user)
}
t.setUser("root", 0 /* nodeIdxOverride */)
t.setUser(username.RootUser, 0 /* nodeIdx */)

// Some cleanup to make sure the following validation queries can run
// successfully. First we rollback in case the logic test had an uncommitted
Expand Down

0 comments on commit 9470850

Please sign in to comment.