diff --git a/pkg/cmd/roachtest/tests/cancel.go b/pkg/cmd/roachtest/tests/cancel.go index b73d5536072a..00ae935e80c0 100644 --- a/pkg/cmd/roachtest/tests/cancel.go +++ b/pkg/cmd/roachtest/tests/cancel.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/tpch" "github.com/cockroachdb/errors" @@ -58,63 +59,104 @@ func registerCancel(r registry.Registry) { t.Fatal(err) } - queryPrefix := "USE tpch; " - if !useDistsql { - queryPrefix += "SET distsql = off; " - } - t.Status("running queries to cancel") + rng, _ := randutil.NewTestRand() for _, queryNum := range tpchQueriesToRun { - // sem is used to indicate that the query-runner goroutine has - // been spawned up. - sem := make(chan struct{}) - // Any error regarding the cancellation (or of its absence) will - // be sent on errCh. - errCh := make(chan error, 1) - go func(queryNum int) { - defer close(errCh) - query := tpch.QueriesByNumber[queryNum] - t.L().Printf("executing q%d\n", queryNum) - sem <- struct{}{} - close(sem) - _, err := conn.Exec(queryPrefix + query) - if err == nil { - errCh <- errors.New("query completed before it could be canceled") - } else { - fmt.Printf("query failed with error: %s\n", err) - // Note that errors.Is() doesn't work here because - // lib/pq wraps the query canceled error. - if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) { - errCh <- errors.Wrap(err, "unexpected error") + // Run each query 5 times to increase the test coverage. + for run := 0; run < 5; run++ { + // sem is used to indicate that the query-runner goroutine + // has been spawned up and has done preliminary setup. + sem := make(chan struct{}) + // An error will always be sent on errCh by the runner + // (either query execution error or an error indicating the + // absence of expected cancellation error). + errCh := make(chan error, 1) + go func(queryNum int) { + runnerConn := c.Conn(ctx, t.L(), 1) + defer runnerConn.Close() + setupQueries := []string{"USE tpch;"} + if !useDistsql { + setupQueries = append(setupQueries, "SET distsql = off;") + } + for _, setupQuery := range setupQueries { + t.L().Printf("executing setup query %q", setupQuery) + if _, err := runnerConn.Exec(setupQuery); err != nil { + errCh <- err + close(sem) + return + } + } + query := tpch.QueriesByNumber[queryNum] + t.L().Printf("executing q%d\n", queryNum) + close(sem) + _, err := runnerConn.Exec(query) + if err == nil { + err = errors.New("query completed before it could be canceled") + } + errCh <- err + }(queryNum) + + // Wait for the query-runner goroutine to start as well as + // to execute setup queries. + <-sem + + // Continuously poll until we get the queryID that we want + // to cancel. We expect it to show up within 10 seconds. + var queryID, query string + timeoutCh := time.After(10 * time.Second) + pollingStartTime := timeutil.Now() + for { + // Sleep for some random duration up to 1000ms. This + // allows us to sometimes find the query when it's in + // the planning stage while in most cases it's in the + // execution stage already. + toSleep := time.Duration(rng.Intn(1001)) * time.Millisecond + t.Status(fmt.Sprintf("sleeping for %s", toSleep)) + time.Sleep(toSleep) + rows, err := conn.Query(`SELECT query_id, query FROM [SHOW CLUSTER QUERIES] WHERE query NOT LIKE '%SHOW CLUSTER QUERIES%'`) + if err != nil { + t.Fatal(err) + } + if rows.Next() { + if err = rows.Scan(&queryID, &query); err != nil { + t.Fatal(err) + } + break + } + if err = rows.Close(); err != nil { + t.Fatal(err) + } + select { + case err = <-errCh: + t.Fatalf("received an error from the runner goroutine before the query could be canceled: %v", err) + case <-timeoutCh: + t.Fatal(errors.New("didn't see the query to cancel within 10 seconds")) + default: } } - }(queryNum) - - // Wait for the query-runner goroutine to start. - <-sem - - // The cancel query races with the execution of the query it's trying to - // cancel, which may result in attempting to cancel the query before it - // has started. To be more confident that the query is executing, wait - // a bit before attempting to cancel it. - time.Sleep(250 * time.Millisecond) - - const cancelQuery = `CANCEL QUERIES - SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE query not like '%SHOW CLUSTER QUERIES%'` - c.Run(ctx, c.Node(1), `./cockroach sql --insecure -e "`+cancelQuery+`"`) - cancelStartTime := timeutil.Now() - - select { - case err, ok := <-errCh: - if ok { - t.Fatal(err) + + t.Status(fmt.Sprintf("canceling the query after waiting for %s", timeutil.Since(pollingStartTime))) + _, err := conn.Exec(`CANCEL QUERY $1`, queryID) + if err != nil { + t.Status(fmt.Sprintf("%s: %q", queryID, query)) + t.Fatalf("encountered an error when canceling %q with queryID=%s: %v", query, queryID, err) } - // If errCh is closed, then the cancellation was successful. - timeToCancel := timeutil.Since(cancelStartTime) - fmt.Printf("canceling q%d took %s\n", queryNum, timeToCancel) + cancelStartTime := timeutil.Now() - case <-time.After(5 * time.Second): - t.Fatal("query took too long to respond to cancellation") + select { + case err := <-errCh: + t.Status(err) + if !strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()) { + // Note that errors.Is() doesn't work here because + // lib/pq wraps the query canceled error. + t.Fatal(errors.Wrap(err, "unexpected error")) + } + timeToCancel := timeutil.Since(cancelStartTime) + t.Status(fmt.Sprintf("canceling q%d took %s\n", queryNum, timeToCancel)) + + case <-time.After(3 * time.Second): + t.Fatal("query took too long to respond to cancellation") + } } }