Skip to content

Commit

Permalink
sql: deflake and unskip TestTenantStatementTimeoutAdmissionQueueCance…
Browse files Browse the repository at this point in the history
…lation

Fixes: #78494
  • Loading branch information
cucaroach committed Mar 28, 2023
1 parent 3f113fc commit 83ce72c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ go_test(
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_petermattis_goid//:goid",
"@com_github_pmezard_go_difflib//difflib",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
52 changes: 38 additions & 14 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/petermattis/goid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -836,7 +838,6 @@ func getUserConn(t *testing.T, username string, server serverutils.TestServerInt
// main statement with a timeout is blocked.
func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 78494, "flaky test")
defer log.Scope(t).Close(t)

skip.UnderStress(t, "times out under stress")
Expand All @@ -845,8 +846,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tenantID := serverutils.TestTenantID()

sawit := false
numBlockers := 4
var matches int64

// We can't get the tableID programmatically here, checked below with assert.
const tableID = 104
Expand All @@ -865,9 +867,12 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
matchBatch := func(ctx context.Context, req *kvpb.BatchRequest) bool {
tid, ok := roachpb.ClientTenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
return true
scan, ok := req.Requests[0].GetInner().(*kvpb.GetRequest)
if ok {
if tableSpan.ContainsKey(scan.Key) {
log.Infof(ctx, "matchBatch %d", goid.Get())
return true
}
}
}
return false
Expand All @@ -885,18 +890,31 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req *kvpb.BatchRequest) *kvpb.Error {
if matchBatch(ctx, req) {
m := atomic.AddInt64(&matches, 1)
// If any of the blockers get retried just ignore.
if m > int64(numBlockers) {
log.Infof(ctx, "ignoring extra blocker %d", goid.Get())
return nil
}
// Notify we're blocking.
log.Infof(ctx, "blocking %d", goid.Get())
unblockClientCh <- struct{}{}
<-qBlockersCh
}
return nil
},
TestingResponseErrorEvent: func(ctx context.Context, req *kvpb.BatchRequest, err error) {
if matchBatch(ctx, req) {
tid, ok := roachpb.ClientTenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
cancel()
wg.Done()
log.Infof(ctx, "%s %d", scan, goid.Get())
if ok {
if tableSpan.ContainsKey(scan.Key) && !sawit {
sawit = true
log.Infof(ctx, "got scan request error %d", goid.Get())
cancel()
wg.Done()
}
}
}
},
Expand All @@ -911,8 +929,8 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer db.Close()

r1 := sqlutils.MakeSQLRunner(db)
r1.Exec(t, `CREATE TABLE foo (t int)`)

r1.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false`)
r1.Exec(t, `CREATE TABLE foo (t int PRIMARY KEY)`)
row := r1.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`)
var id int64
row.Scan(&id)
Expand All @@ -931,18 +949,24 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
for _, r := range blockers {
go func(r *sqlutils.SQLRunner) {
defer wg.Done()
r.Exec(t, `SELECT * FROM foo`)
r.Exec(t, `SELECT * FROM foo WHERE t = 1234`)
}(r)
}
// Wait till all blockers are parked.
for i := 0; i < numBlockers; i++ {
<-unblockClientCh
}
client.ExpectErr(t, "timeout", `SELECT * FROM foo`)
// Unblock the blockers.
log.Infof(ctx, "blockers parked")
// Because we don't know when statement timeout will happen we have to repeat
// till we get one into the KV layer.
for !sawit {
_, err := client.DB.ExecContext(context.Background(), `SELECT * FROM foo`)
log.Infof(ctx, "main req finished: %v, %t", err, sawit)
}
for i := 0; i < numBlockers; i++ {
qBlockersCh <- struct{}{}
}
log.Infof(ctx, "unblocked blockers")
wg.Wait()
require.ErrorIs(t, ctx.Err(), context.Canceled)
}

0 comments on commit 83ce72c

Please sign in to comment.