From 83ce72c050f571c921f528e641707a6af8edcaa2 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Tue, 28 Mar 2023 18:28:12 -0400 Subject: [PATCH] sql: deflake and unskip TestTenantStatementTimeoutAdmissionQueueCancelation Fixes: #78494 --- pkg/sql/BUILD.bazel | 1 + pkg/sql/run_control_test.go | 52 +++++++++++++++++++++++++++---------- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index dfafaedccefd..c54f6a59361b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -870,6 +870,7 @@ go_test( "@com_github_jackc_pgx_v4//:pgx", "@com_github_lib_pq//:pq", "@com_github_lib_pq//oid", + "@com_github_petermattis_goid//:goid", "@com_github_pmezard_go_difflib//difflib", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index 34aaea12f5f2..fbd8c65eb82b 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -18,6 +18,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "testing" "time" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/petermattis/goid" "github.com/stretchr/testify/require" ) @@ -836,7 +838,6 @@ func getUserConn(t *testing.T, username string, server serverutils.TestServerInt // main statement with a timeout is blocked. func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 78494, "flaky test") defer log.Scope(t).Close(t) skip.UnderStress(t, "times out under stress") @@ -845,8 +846,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() tenantID := serverutils.TestTenantID() - + sawit := false numBlockers := 4 + var matches int64 // We can't get the tableID programmatically here, checked below with assert. const tableID = 104 @@ -865,9 +867,12 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { matchBatch := func(ctx context.Context, req *kvpb.BatchRequest) bool { tid, ok := roachpb.ClientTenantFromContext(ctx) if ok && tid == tenantID && len(req.Requests) > 0 { - scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest) - if ok && tableSpan.ContainsKey(scan.Key) { - return true + scan, ok := req.Requests[0].GetInner().(*kvpb.GetRequest) + if ok { + if tableSpan.ContainsKey(scan.Key) { + log.Infof(ctx, "matchBatch %d", goid.Get()) + return true + } } } return false @@ -885,18 +890,31 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, req *kvpb.BatchRequest) *kvpb.Error { if matchBatch(ctx, req) { + m := atomic.AddInt64(&matches, 1) + // If any of the blockers get retried just ignore. + if m > int64(numBlockers) { + log.Infof(ctx, "ignoring extra blocker %d", goid.Get()) + return nil + } // Notify we're blocking. + log.Infof(ctx, "blocking %d", goid.Get()) unblockClientCh <- struct{}{} <-qBlockersCh } return nil }, TestingResponseErrorEvent: func(ctx context.Context, req *kvpb.BatchRequest, err error) { - if matchBatch(ctx, req) { + tid, ok := roachpb.ClientTenantFromContext(ctx) + if ok && tid == tenantID && len(req.Requests) > 0 { scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest) - if ok && tableSpan.ContainsKey(scan.Key) { - cancel() - wg.Done() + log.Infof(ctx, "%s %d", scan, goid.Get()) + if ok { + if tableSpan.ContainsKey(scan.Key) && !sawit { + sawit = true + log.Infof(ctx, "got scan request error %d", goid.Get()) + cancel() + wg.Done() + } } } }, @@ -911,8 +929,8 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { defer db.Close() r1 := sqlutils.MakeSQLRunner(db) - r1.Exec(t, `CREATE TABLE foo (t int)`) - + r1.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false`) + r1.Exec(t, `CREATE TABLE foo (t int PRIMARY KEY)`) row := r1.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`) var id int64 row.Scan(&id) @@ -931,18 +949,24 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) { for _, r := range blockers { go func(r *sqlutils.SQLRunner) { defer wg.Done() - r.Exec(t, `SELECT * FROM foo`) + r.Exec(t, `SELECT * FROM foo WHERE t = 1234`) }(r) } // Wait till all blockers are parked. for i := 0; i < numBlockers; i++ { <-unblockClientCh } - client.ExpectErr(t, "timeout", `SELECT * FROM foo`) - // Unblock the blockers. + log.Infof(ctx, "blockers parked") + // Because we don't know when statement timeout will happen we have to repeat + // till we get one into the KV layer. + for !sawit { + _, err := client.DB.ExecContext(context.Background(), `SELECT * FROM foo`) + log.Infof(ctx, "main req finished: %v, %t", err, sawit) + } for i := 0; i < numBlockers; i++ { qBlockersCh <- struct{}{} } + log.Infof(ctx, "unblocked blockers") wg.Wait() require.ErrorIs(t, ctx.Err(), context.Canceled) }