Skip to content

Commit

Permalink
logictest: change logicTest to store per-node clients for users
Browse files Browse the repository at this point in the history
This patch changes the `clients` field within `logicTest` from type
`map[string]*gosql.DB` to `map[string]map[int]*gosql.DB` in order to
store per-node clients for each user. This allows the `user` cmd in
logic tests to fetch a client for the specified node instead of the
previously used node.

Release note: None
  • Loading branch information
andyyang890 committed Jan 31, 2023
1 parent 91bdcdd commit f0b3c65
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ import (
// Changes the user for subsequent statements or queries.
// If nodeidx is specified, this user will connect to the node
// in the cluster with index N (note this is 0-indexed, while
// node IDs themselves are 1-indexed).
// node IDs themselves are 1-indexed). Otherwise, it will connect
// to the node with index 0 (node ID 1).
//
// - upgrade N
// When using a cockroach-go/testserver logictest, upgrades the node at
Expand Down Expand Up @@ -973,9 +974,9 @@ type logicTest struct {
// If this test uses a SQL tenant server, this is its address. In this case,
// all clients are created against this tenant.
tenantAddrs []string
// map of built clients. Needs to be persisted so that we can
// re-use them and close them all on exit.
clients map[string]*gosql.DB
// map of built clients, keyed first on username and then node idx.
// They are persisted so that they can be reused and also closed upon exit.
clients map[string]map[int]*gosql.DB
// client currently in use. This can change during processing
// of a test input file when encountering the "user" directive.
// see setUser() for details.
Expand Down Expand Up @@ -1140,26 +1141,19 @@ func (t *logicTest) outf(format string, args ...interface{}) {
fmt.Printf("[%s] %s\n", now, msg)
}

// setUser sets the DB client to the specified user.
// setUser sets the DB client to the specified user and connects
// to the node in the cluster at index nodeIdx.
// It returns a cleanup function to be run when the credentials
// are no longer needed.
func (t *logicTest) setUser(user string, nodeIdxOverride int) func() {
if t.clients == nil {
t.clients = map[string]*gosql.DB{}
}
if db, ok := t.clients[user]; ok {
func (t *logicTest) setUser(user string, nodeIdx int) func() {
if db, ok := t.clients[user][nodeIdx]; ok {
t.db = db
t.user = user

// No cleanup necessary, but return a no-op func to avoid nil pointer dereference.
return func() {}
}

nodeIdx := t.nodeIdx
if nodeIdxOverride > 0 {
nodeIdx = nodeIdxOverride
}

var addr string
var pgURL url.URL
var pgUser string
Expand Down Expand Up @@ -1193,9 +1187,16 @@ func (t *logicTest) setUser(user string, nodeIdxOverride int) func() {
if _, err := db.Exec("SET index_recommendations_enabled = false"); err != nil {
t.Fatal(err)
}
t.clients[user] = db
if t.clients == nil {
t.clients = make(map[string]map[int]*gosql.DB)
}
if t.clients[user] == nil {
t.clients[user] = make(map[int]*gosql.DB)
}
t.clients[user][nodeIdx] = db
t.db = db
t.user = pgUser
t.nodeIdx = nodeIdx

return cleanupFunc
}
Expand Down Expand Up @@ -1286,7 +1287,7 @@ func (t *logicTest) newTestServerCluster(bootstrapBinaryPath string, upgradeBina
}

t.testserverCluster = ts
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdxOverride */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdx */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, ts.Stop)
}

Expand Down Expand Up @@ -1737,7 +1738,7 @@ func (t *logicTest) newCluster(

// db may change over the lifetime of this function, with intermediate
// values cached in t.clients and finally closed in t.close().
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdxOverride */))
t.clusterCleanupFuncs = append(t.clusterCleanupFuncs, t.setUser(username.RootUser, 0 /* nodeIdx */))
}

// waitForTenantReadOnlyClusterSettingToTakeEffectOrFatal waits until all tenant
Expand Down Expand Up @@ -1790,8 +1791,10 @@ func (t *logicTest) shutdownCluster() {
t.cluster = nil
}
if t.clients != nil {
for _, c := range t.clients {
c.Close()
for _, userClients := range t.clients {
for _, c := range userClients {
c.Close()
}
}
t.clients = nil
}
Expand Down Expand Up @@ -2224,7 +2227,12 @@ func (t *logicTest) processTestFile(path string, config logictestbase.TestCluste
}

func (t *logicTest) hasOpenTxns(ctx context.Context) bool {
for _, user := range t.clients {
for _, userClients := range t.clients {
// Get an arbitrary client for the user.
var user *gosql.DB
for _, user = range userClients {
break
}
existingTxnPriority := "NORMAL"
err := user.QueryRow("SHOW TRANSACTION PRIORITY").Scan(&existingTxnPriority)
if err != nil {
Expand Down Expand Up @@ -2274,7 +2282,7 @@ func (t *logicTest) maybeBackupRestore(

oldUser := t.user
defer func() {
t.setUser(oldUser, 0 /* nodeIdxOverride */)
t.setUser(oldUser, 0 /* nodeIdx */)
}()

log.Info(context.Background(), "Running cluster backup and restore")
Expand All @@ -2290,7 +2298,7 @@ func (t *logicTest) maybeBackupRestore(
userToHexSession := make(map[string]string, len(t.clients))
userToSessionVars := make(map[string]map[string]string, len(t.clients))
for user := range t.clients {
t.setUser(user, 0 /* nodeIdxOverride */)
t.setUser(user, 0 /* nodeIdx */)
users = append(users, user)

// Serialize session variables.
Expand Down Expand Up @@ -2331,7 +2339,7 @@ func (t *logicTest) maybeBackupRestore(
strconv.FormatInt(timeutil.Now().UnixNano(), 10))

// Perform the backup and restore as root.
t.setUser(username.RootUser, 0 /* nodeIdxOverride */)
t.setUser(username.RootUser, 0 /* nodeIdx */)

if _, err := t.db.Exec(fmt.Sprintf("BACKUP INTO '%s'", backupLocation)); err != nil {
return errors.Wrap(err, "backing up cluster")
Expand All @@ -2342,7 +2350,7 @@ func (t *logicTest) maybeBackupRestore(
t.resetCluster()

// Run the restore as root.
t.setUser(username.RootUser, 0 /* nodeIdxOverride */)
t.setUser(username.RootUser, 0 /* nodeIdx */)
if _, err := t.db.Exec(fmt.Sprintf("RESTORE FROM LATEST IN '%s'", backupLocation)); err != nil {
return errors.Wrap(err, "restoring cluster")
}
Expand All @@ -2353,7 +2361,7 @@ func (t *logicTest) maybeBackupRestore(
// variables that we collected.
for _, user := range users {
// Call setUser for every user to create the connection for that user.
t.setUser(user, 0 /* nodeIdxOverride */)
t.setUser(user, 0 /* nodeIdx */)

if userSession, ok := userToHexSession[user]; ok {
if _, err := t.db.Exec(fmt.Sprintf(`SELECT crdb_internal.deserialize_session(decode('%s', 'hex'))`, userSession)); err != nil {
Expand Down Expand Up @@ -3864,16 +3872,18 @@ func (t *logicTest) validateAfterTestCompletion() error {
}

// Close all clients other than "root"
for username, c := range t.clients {
for username, userClients := range t.clients {
if username == "root" {
continue
}
delete(t.clients, username)
if err := c.Close(); err != nil {
t.Fatalf("failed to close connection for user %s: %v", username, err)
for i, c := range userClients {
if err := c.Close(); err != nil {
t.Fatalf("failed to close connection to node %d for user %s: %v", i, username, err)
}
}
delete(t.clients, username)
}
t.setUser("root", 0 /* nodeIdxOverride */)
t.setUser("root", 0 /* nodeIdx */)

// Some cleanup to make sure the following validation queries can run
// successfully. First we rollback in case the logic test had an uncommitted
Expand Down

0 comments on commit f0b3c65

Please sign in to comment.