Skip to content

Commit

Permalink
Merge pull request #110896 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-22.2-105569
  • Loading branch information
yuzefovich authored Sep 19, 2023
2 parents 59888c3 + d023a2f commit 92179ad
Showing 1 changed file with 93 additions and 51 deletions.
144 changes: 93 additions & 51 deletions pkg/cmd/roachtest/tests/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/tpch"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -58,63 +59,104 @@ func registerCancel(r registry.Registry) {
t.Fatal(err)
}

queryPrefix := "USE tpch; "
if !useDistsql {
queryPrefix += "SET distsql = off; "
}

t.Status("running queries to cancel")
rng, _ := randutil.NewTestRand()
for _, queryNum := range tpchQueriesToRun {
// sem is used to indicate that the query-runner goroutine has
// been spawned up.
sem := make(chan struct{})
// Any error regarding the cancellation (or of its absence) will
// be sent on errCh.
errCh := make(chan error, 1)
go func(queryNum int) {
defer close(errCh)
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
sem <- struct{}{}
close(sem)
_, err := conn.Exec(queryPrefix + query)
if err == nil {
errCh <- errors.New("query completed before it could be canceled")
} else {
fmt.Printf("query failed with error: %s\n", err)
// Note that errors.Is() doesn't work here because
// lib/pq wraps the query canceled error.
if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) {
errCh <- errors.Wrap(err, "unexpected error")
// Run each query 5 times to increase the test coverage.
for run := 0; run < 5; run++ {
// sem is used to indicate that the query-runner goroutine
// has been spawned up and has done preliminary setup.
sem := make(chan struct{})
// An error will always be sent on errCh by the runner
// (either query execution error or an error indicating the
// absence of expected cancellation error).
errCh := make(chan error, 1)
go func(queryNum int) {
runnerConn := c.Conn(ctx, t.L(), 1)
defer runnerConn.Close()
setupQueries := []string{"USE tpch;"}
if !useDistsql {
setupQueries = append(setupQueries, "SET distsql = off;")
}
for _, setupQuery := range setupQueries {
t.L().Printf("executing setup query %q", setupQuery)
if _, err := runnerConn.Exec(setupQuery); err != nil {
errCh <- err
close(sem)
return
}
}
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
close(sem)
_, err := runnerConn.Exec(query)
if err == nil {
err = errors.New("query completed before it could be canceled")
}
errCh <- err
}(queryNum)

// Wait for the query-runner goroutine to start as well as
// to execute setup queries.
<-sem

// Continuously poll until we get the queryID that we want
// to cancel. We expect it to show up within 10 seconds.
var queryID, query string
timeoutCh := time.After(10 * time.Second)
pollingStartTime := timeutil.Now()
for {
// Sleep for some random duration up to 1000ms. This
// allows us to sometimes find the query when it's in
// the planning stage while in most cases it's in the
// execution stage already.
toSleep := time.Duration(rng.Intn(1001)) * time.Millisecond
t.Status(fmt.Sprintf("sleeping for %s", toSleep))
time.Sleep(toSleep)
rows, err := conn.Query(`SELECT query_id, query FROM [SHOW CLUSTER QUERIES] WHERE query NOT LIKE '%SHOW CLUSTER QUERIES%'`)
if err != nil {
t.Fatal(err)
}
if rows.Next() {
if err = rows.Scan(&queryID, &query); err != nil {
t.Fatal(err)
}
break
}
if err = rows.Close(); err != nil {
t.Fatal(err)
}
select {
case err = <-errCh:
t.Fatalf("received an error from the runner goroutine before the query could be canceled: %v", err)
case <-timeoutCh:
t.Fatal(errors.New("didn't see the query to cancel within 10 seconds"))
default:
}
}
}(queryNum)

// Wait for the query-runner goroutine to start.
<-sem

// The cancel query races with the execution of the query it's trying to
// cancel, which may result in attempting to cancel the query before it
// has started. To be more confident that the query is executing, wait
// a bit before attempting to cancel it.
time.Sleep(250 * time.Millisecond)

const cancelQuery = `CANCEL QUERIES
SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE query not like '%SHOW CLUSTER QUERIES%'`
c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "`+cancelQuery+`"`)
cancelStartTime := timeutil.Now()

select {
case err, ok := <-errCh:
if ok {
t.Fatal(err)

t.Status(fmt.Sprintf("canceling the query after waiting for %s", timeutil.Since(pollingStartTime)))
_, err := conn.Exec(`CANCEL QUERY $1`, queryID)
if err != nil {
t.Status(fmt.Sprintf("%s: %q", queryID, query))
t.Fatalf("encountered an error when canceling %q with queryID=%s: %v", query, queryID, err)
}
// If errCh is closed, then the cancellation was successful.
timeToCancel := timeutil.Since(cancelStartTime)
fmt.Printf("canceling q%d took %s\n", queryNum, timeToCancel)
cancelStartTime := timeutil.Now()

case <-time.After(5 * time.Second):
t.Fatal("query took too long to respond to cancellation")
select {
case err := <-errCh:
t.Status(err)
if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) {
// Note that errors.Is() doesn't work here because
// lib/pq wraps the query canceled error.
t.Fatal(errors.Wrap(err, "unexpected error"))
}
timeToCancel := timeutil.Since(cancelStartTime)
t.Status(fmt.Sprintf("canceling q%d took %s\n", queryNum, timeToCancel))

case <-time.After(3 * time.Second):
t.Fatal("query took too long to respond to cancellation")
}
}
}

Expand Down

0 comments on commit 92179ad

Please sign in to comment.