Skip to content

Commit

Permalink
workload: fix missing Err() checks on sql.Query usages
Browse files Browse the repository at this point in the history
When calling Next until it returns false there could be error or it
could be the end of results.  Err must be called to see which happened.
In lots of cases we return errors early and may not ever call Err so
use defer to always call Close.

This fixes all but one of the cases found by the rowserrcheck linter,
there's one false positive in tpcds that appears to be caused by go
routine usage:

pkg/workload/tpcds/tpcds.go:311:26: rows.Err must be checked

Also there are 100+ lint failures in pkg/sql/... test code.  A follow
on PR will investigate how to turn on linting globally.

Release justification: fixes hidden errors in test code
Release note: None
Fixes #68164
  • Loading branch information
cucaroach committed Sep 10, 2021
1 parent 108c410 commit c5cac34
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 85 deletions.
47 changes: 24 additions & 23 deletions pkg/workload/querylog/querylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,12 @@ func (w *worker) generatePlaceholders(

// getTableNames fetches the names of all the tables in db and stores them in
// w.state.
func (w *querylog) getTableNames(db *gosql.DB) error {
func (w *querylog) getTableNames(db *gosql.DB) (retErr error) {
rows, err := db.Query(`SELECT table_name FROM [SHOW TABLES] ORDER BY table_name`)
if err != nil {
return err
}
defer rows.Close()
defer func() { retErr = errors.CombineErrors(retErr, rows.Close()) }()
w.state.tableNames = make([]string, 0)
for rows.Next() {
var tableName string
Expand All @@ -507,7 +507,7 @@ func (w *querylog) getTableNames(db *gosql.DB) error {
}
w.state.tableNames = append(w.state.tableNames, tableName)
}
return nil
return rows.Err()
}

// unzip unzips the zip file at src into dest. It was copied (with slight
Expand Down Expand Up @@ -726,7 +726,7 @@ func (w *querylog) processQueryLog(ctx context.Context) error {

// getColumnsInfo populates the information about the columns of the tables
// that at least one query was issued against.
func (w *querylog) getColumnsInfo(db *gosql.DB) error {
func (w *querylog) getColumnsInfo(db *gosql.DB) (retErr error) {
w.state.columnsByTableName = make(map[string][]columnInfo)
for _, tableName := range w.state.tableNames {
if !w.state.tableUsed[tableName] {
Expand All @@ -743,14 +743,17 @@ func (w *querylog) getColumnsInfo(db *gosql.DB) error {
if err != nil {
return err
}
defer func(rows *gosql.Rows) {
retErr = errors.CombineErrors(retErr, rows.Close())
}(rows)
for rows.Next() {
var columnName, dataType string
if err = rows.Scan(&columnName, &dataType); err != nil {
return err
}
columnTypeByColumnName[columnName] = dataType
}
if err = rows.Close(); err != nil {
if err = rows.Err(); err != nil {
return err
}

Expand All @@ -771,11 +774,13 @@ WHERE attrelid=$1`, relid)
if err != nil {
return err
}
defer func(rows *gosql.Rows) {
retErr = errors.CombineErrors(retErr, rows.Close())
}(rows)

var cols []columnInfo
var numCols = 0

defer rows.Close()
for rows.Next() {
var c columnInfo
c.dataPrecision = 0
Expand All @@ -797,7 +802,9 @@ WHERE attrelid=$1`, relid)
cols = append(cols, c)
numCols++
}

if err = rows.Err(); err != nil {
return err
}
if numCols == 0 {
return errors.Errorf("no columns detected")
}
Expand All @@ -809,7 +816,7 @@ WHERE attrelid=$1`, relid)
// populateSamples selects at most w.numSamples of samples from each table that
// at least one query was issued against the query log. The samples are stored
// inside corresponding to the table columnInfo.
func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) error {
func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) (retErr error) {
log.Infof(ctx, "Populating samples started")
for _, tableName := range w.state.tableNames {
cols := w.state.columnsByTableName[tableName]
Expand All @@ -818,16 +825,9 @@ func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) error {
continue
}
log.Infof(ctx, "Sampling %s", tableName)
count, err := db.Query(fmt.Sprintf(`SELECT count(*) FROM %s`, tableName))
if err != nil {
return err
}
count.Next()
row := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM %s`, tableName))
var numRows int
if err = count.Scan(&numRows); err != nil {
return err
}
if err = count.Close(); err != nil {
if err := row.Scan(&numRows); err != nil {
return err
}

Expand All @@ -840,23 +840,24 @@ func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) error {
}
columnsOrdered := strings.Join(columnNames, ", ")

var samples *gosql.Rows
var samplesQuery string
if w.numSamples > numRows {
samples, err = db.Query(
fmt.Sprintf(`SELECT %s FROM %s`, columnsOrdered, tableName))
samplesQuery = fmt.Sprintf(`SELECT %s FROM %s`, columnsOrdered, tableName)
} else {
samplingProb := float64(w.numSamples) / float64(numRows)
if samplingProb < w.minSamplingProb {
// To speed up the query.
samplingProb = w.minSamplingProb
}
samples, err = db.Query(fmt.Sprintf(`SELECT %s FROM %s WHERE random() < %f LIMIT %d`,
columnsOrdered, tableName, samplingProb, w.numSamples))
samplesQuery = fmt.Sprintf(`SELECT %s FROM %s WHERE random() < %f LIMIT %d`,
columnsOrdered, tableName, samplingProb, w.numSamples)
}

samples, err := db.Query(samplesQuery)
if err != nil {
return err
}
defer func() { retErr = errors.CombineErrors(retErr, samples.Close()) }()
for samples.Next() {
rowOfSamples := make([]interface{}, len(cols))
for i := range rowOfSamples {
Expand All @@ -869,7 +870,7 @@ func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) error {
cols[i].samples = append(cols[i].samples, sample)
}
}
if err = samples.Close(); err != nil {
if err = samples.Err(); err != nil {
return err
}
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/workload/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type col struct {
// Ops implements the Opser interface.
func (w *random) Ops(
ctx context.Context, urls []string, reg *histogram.Registry,
) (workload.QueryLoad, error) {
) (ql workload.QueryLoad, retErr error) {
sqlDatabase, err := workload.SanitizeUrls(w, w.connFlags.DBOverride, urls)
if err != nil {
return workload.QueryLoad{}, err
Expand Down Expand Up @@ -147,11 +147,10 @@ WHERE attrelid=$1`, relid)
if err != nil {
return workload.QueryLoad{}, err
}

defer func() { retErr = errors.CombineErrors(retErr, rows.Close()) }()
var cols []col
var numCols = 0

defer rows.Close()
for rows.Next() {
var c col
c.dataPrecision = 0
Expand All @@ -177,6 +176,10 @@ WHERE attrelid=$1`, relid)
return workload.QueryLoad{}, errors.New("no columns detected")
}

if err = rows.Err(); err != nil {
return workload.QueryLoad{}, err
}

// insert on conflict requires the primary key. check information_schema if not specified on the command line
if strings.HasPrefix(w.method, "ioc") && w.primaryKey == "" {
rows, err := db.Query(
Expand All @@ -190,7 +193,7 @@ AND i.indisprimary`, relid)
if err != nil {
return workload.QueryLoad{}, err
}
defer rows.Close()
defer func() { retErr = errors.CombineErrors(retErr, rows.Close()) }()
for rows.Next() {
var colname string

Expand All @@ -203,6 +206,9 @@ AND i.indisprimary`, relid)
w.primaryKey += colname
}
}
if err = rows.Err(); err != nil {
return workload.QueryLoad{}, err
}
}

if strings.HasPrefix(w.method, "ioc") && w.primaryKey == "" {
Expand Down Expand Up @@ -276,7 +282,7 @@ AND i.indisprimary`, relid)
return workload.QueryLoad{}, err
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
ql = workload.QueryLoad{SQLDatabase: sqlDatabase}

for i := 0; i < w.connFlags.Concurrency; i++ {
op := randOp{
Expand Down
Loading

0 comments on commit c5cac34

Please sign in to comment.