Skip to content

Commit

Permalink
roachtest: harden and extend cancel roachtest
Browse files Browse the repository at this point in the history
This commit hardens `cancel` roachtest. In particular, this test
involves two goroutines: the runner that is executing longer running
TPCH query and the main goroutine that cancels that query. Previously,
in order to ensure that the main goroutine attempts to cancel the query
at the right moment, we slept for 250ms. Then, we would cancel all
running non-internal queries other than `SHOW CLUSTER QUERIES` itself.
This was problematic for a couple of reasons:
- the TPCH query might not have started yet (due some Go scheduling
delays)
- we could actually try to cancel one of the setup queries (the runner
does `USE tpch;` and `SET distsql = off;` before running the TPCH
query).

In order to address the first reason, this commit adjusts the runner to
notify the main goroutine only after the setup queries are done and
introduces the polling loop to wait until the TPCH query shows up. That
polling loop will now randomly sleep for a random duration up to 1000ms
(in order to improve the test coverage of both the optimizer and the
execution engine). Note that we only check that the cancellation
occurred within 3s (used to be 5s before this commit), so we don't
sufficiently exercise the optimizer cancellation (which isn't the
primary goal of this test anyway).

The second reason is addressed by blocking the main goroutine until the
setup queries are done.

Release note: None
  • Loading branch information
yuzefovich committed Jun 29, 2023
1 parent 4a614f8 commit f80b346
Showing 1 changed file with 93 additions and 51 deletions.
144 changes: 93 additions & 51 deletions pkg/cmd/roachtest/tests/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/tpch"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -58,63 +59,104 @@ func registerCancel(r registry.Registry) {
t.Fatal(err)
}

queryPrefix := "USE tpch; "
if !useDistsql {
queryPrefix += "SET distsql = off; "
}

t.Status("running queries to cancel")
rng, _ := randutil.NewTestRand()
for _, queryNum := range tpchQueriesToRun {
// sem is used to indicate that the query-runner goroutine has
// been spawned up.
sem := make(chan struct{})
// Any error regarding the cancellation (or of its absence) will
// be sent on errCh.
errCh := make(chan error, 1)
go func(queryNum int) {
defer close(errCh)
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
sem <- struct{}{}
close(sem)
_, err := conn.Exec(queryPrefix + query)
if err == nil {
errCh <- errors.New("query completed before it could be canceled")
} else {
fmt.Printf("query failed with error: %s\n", err)
// Note that errors.Is() doesn't work here because
// lib/pq wraps the query canceled error.
if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) {
errCh <- errors.Wrap(err, "unexpected error")
// Run each query 5 times to increase the test coverage.
for run := 0; run < 5; run++ {
// sem is used to indicate that the query-runner goroutine
// has been spawned up and has done preliminary setup.
sem := make(chan struct{})
// An error will always be sent on errCh by the runner
// (either query execution error or an error indicating the
// absence of expected cancellation error).
errCh := make(chan error, 1)
go func(queryNum int) {
runnerConn := c.Conn(ctx, t.L(), 1)
defer runnerConn.Close()
setupQueries := []string{"USE tpch;"}
if !useDistsql {
setupQueries = append(setupQueries, "SET distsql = off;")
}
for _, setupQuery := range setupQueries {
t.L().Printf("executing setup query %q", setupQuery)
if _, err := runnerConn.Exec(setupQuery); err != nil {
errCh <- err
close(sem)
return
}
}
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
close(sem)
_, err := runnerConn.Exec(query)
if err == nil {
err = errors.New("query completed before it could be canceled")
}
errCh <- err
}(queryNum)

// Wait for the query-runner goroutine to start as well as
// to execute setup queries.
<-sem

// Continuously poll until we get the queryID that we want
// to cancel. We expect it to show up within 10 seconds.
var queryID, query string
timeoutCh := time.After(10 * time.Second)
pollingStartTime := timeutil.Now()
for {
// Sleep for some random duration up to 1000ms. This
// allows us to sometimes find the query when it's in
// the planning stage while in most cases it's in the
// execution stage already.
toSleep := time.Duration(rng.Intn(1001)) * time.Millisecond
t.Status(fmt.Sprintf("sleeping for %s", toSleep))
time.Sleep(toSleep)
rows, err := conn.Query(`SELECT query_id, query FROM [SHOW CLUSTER QUERIES] WHERE query NOT LIKE '%SHOW CLUSTER QUERIES%'`)
if err != nil {
t.Fatal(err)
}
if rows.Next() {
if err = rows.Scan(&queryID, &query); err != nil {
t.Fatal(err)
}
break
}
if err = rows.Close(); err != nil {
t.Fatal(err)
}
select {
case err = <-errCh:
t.Fatalf("received an error from the runner goroutine before the query could be canceled: %v", err)
case <-timeoutCh:
t.Fatal(errors.New("didn't see the query to cancel within 10 seconds"))
default:
}
}
}(queryNum)

// Wait for the query-runner goroutine to start.
<-sem

// The cancel query races with the execution of the query it's trying to
// cancel, which may result in attempting to cancel the query before it
// has started. To be more confident that the query is executing, wait
// a bit before attempting to cancel it.
time.Sleep(250 * time.Millisecond)

const cancelQuery = `CANCEL QUERIES
SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE query not like '%SHOW CLUSTER QUERIES%'`
c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "`+cancelQuery+`"`)
cancelStartTime := timeutil.Now()

select {
case err, ok := <-errCh:
if ok {
t.Fatal(err)

t.Status(fmt.Sprintf("canceling the query after waiting for %s", timeutil.Since(pollingStartTime)))
_, err := conn.Exec(`CANCEL QUERY $1`, queryID)
if err != nil {
t.Status(fmt.Sprintf("%s: %q", queryID, query))
t.Fatalf("encountered an error when canceling %q with queryID=%s: %v", query, queryID, err)
}
// If errCh is closed, then the cancellation was successful.
timeToCancel := timeutil.Since(cancelStartTime)
fmt.Printf("canceling q%d took %s\n", queryNum, timeToCancel)
cancelStartTime := timeutil.Now()

case <-time.After(5 * time.Second):
t.Fatal("query took too long to respond to cancellation")
select {
case err := <-errCh:
t.Status(err)
if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) {
// Note that errors.Is() doesn't work here because
// lib/pq wraps the query canceled error.
t.Fatal(errors.Wrap(err, "unexpected error"))
}
timeToCancel := timeutil.Since(cancelStartTime)
t.Status(fmt.Sprintf("canceling q%d took %s\n", queryNum, timeToCancel))

case <-time.After(3 * time.Second):
t.Fatal("query took too long to respond to cancellation")
}
}
}

Expand Down

0 comments on commit f80b346

Please sign in to comment.