From 168b2b832d50f4c00c087824b75abbe1ff9c3815 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 24 Jul 2023 13:13:44 -0400 Subject: [PATCH 1/4] sql: revalidate_unique_constraints_in_all_tables check for cancellation Previously, the function: revalidate_unique_constraints_in_all_tables did not properly check for cancellation when looping over all descriptors. This could just lead to a delayed cancellation or timeouts in tests. To address this, this patch updates the logic to check for cancellations while loop over descriptors. Fixes: #107410, #107411 Release note: None --- pkg/sql/check.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 81ead697aae1..7a5ad31e0555 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -434,6 +434,11 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er return err } return inDB.ForEachDescriptor(func(desc catalog.Descriptor) error { + // If the context is cancelled, then we should bail out, since + // the actual revalidate operation might not check anything. + if err := ctx.Err(); err != nil { + return err + } tableDesc, err := catalog.AsTableDescriptor(desc) if err != nil { return err From 679cde57941b6aedbaf8e5e93f7e8018b2d61d75 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 24 Jul 2023 13:58:42 -0400 Subject: [PATCH 2/4] sql: TestRandomSyntaxFunctions skip crdb_internal.gen_rand_ident Previously, the TestRandomSyntaxFunctions could invoke crdb_internal.gen_rand_ident with random arguments, which could be giant number of random identifiers. To address this, this patch will avoid validating this function. Release note: None --- pkg/sql/tests/rsg_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/sql/tests/rsg_test.go b/pkg/sql/tests/rsg_test.go index 67bfeeab25bc..5f61ff324321 100644 --- a/pkg/sql/tests/rsg_test.go +++ b/pkg/sql/tests/rsg_test.go @@ -358,6 +358,10 @@ func TestRandomSyntaxFunctions(t *testing.T) { switch lower { case "pg_sleep": continue + case "crdb_internal.gen_rand_ident": + // Generates random identifiers, so a large number are dangerous and + // can take a long time. + continue case "st_frechetdistance", "st_buffer": // Some spatial function are slow and testing them here // is not worth it. From 3fa2769586d8fea466fffa37fac17d2bc831d30b Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Fri, 4 Aug 2023 11:50:09 -0400 Subject: [PATCH 3/4] sql: TestRandomSyntaxFunctions skip schema telemetry job function Previously, this test could depending on the concurrency, could spawn a large number of schema telemetry jobs via (ctdb_internal.create_sql_schema_telemetry_job). This could lead to contention that will eventually cause this test to time out. To address this, this patch limits calling the telemetry job creation function. Fixes: #108153 Release note: None --- pkg/sql/tests/rsg_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/sql/tests/rsg_test.go b/pkg/sql/tests/rsg_test.go index 5f61ff324321..65697699f1db 100644 --- a/pkg/sql/tests/rsg_test.go +++ b/pkg/sql/tests/rsg_test.go @@ -175,6 +175,7 @@ func (db *verifyFormatDB) execWithResettableTimeout( }() retry := true targetDuration := duration + cancellationChannel := ctx.Done() for retry { retry = false err := func() error { @@ -202,7 +203,7 @@ func (db *verifyFormatDB) execWithResettableTimeout( return &nonCrasher{sql: sql, err: err} } return nil - case <-ctx.Done(): + case <-cancellationChannel: // Sanity: The context is cancelled when the test is about to // timeout. We will log whatever statement we're waiting on for // debugging purposes. Sometimes queries won't respect @@ -211,6 +212,7 @@ func (db *verifyFormatDB) execWithResettableTimeout( // We will intentionally retry, which will us to wait for the // go routine to complete above to avoid leaking it. retry = true + cancellationChannel = nil return nil case <-time.After(targetDuration): db.mu.Lock() @@ -358,6 +360,10 @@ func TestRandomSyntaxFunctions(t *testing.T) { switch lower { case "pg_sleep": continue + case "crdb_internal.create_sql_schema_telemetry_job": + // We can create a crazy number of telemtry jobs accidentally, + // within the test. Leading to terrible contention. + continue case "crdb_internal.gen_rand_ident": // Generates random identifiers, so a large number are dangerous and // can take a long time. @@ -437,6 +443,7 @@ func TestRandomSyntaxFunctions(t *testing.T) { // involve schema changes like truncates. In general this should make // this test more resilient as the timeouts are reset as long progress // is made on *some* connection. + t.Logf("Running %q", s) return db.execWithResettableTimeout(t, ctx, s, *flagRSGExecTimeout, *flagRSGGoRoutines) }) } From b957951e0909ef3ca196f561c79e77bd96b60c52 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 15 Aug 2023 20:16:08 +0000 Subject: [PATCH 4/4] sql: fix use of span after finish in CREATE STATISTICS Previously, the code for creating and waiting for the job inside CREATE STATISTICs had a race condition, where we could end up using context after a tracing span was cleaned up on it. To address this, this patch adjusts the code to wait for a go routine to clean up before, returning and allowing the span to clean up. Epic: none Release note: None --- pkg/sql/create_stats.go | 44 ++++------------------ pkg/sql/stats/create_stats_job_test.go | 52 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 52dae53b485d..adc745934707 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -84,7 +84,7 @@ func StubTableStats( } // createStatsNode is a planNode implemented in terms of a function. The -// startJob function starts a Job during Start, and the remainder of the +// runJob function starts a Job during Start, and the remainder of the // CREATE STATISTICS planning and execution is performed within the jobs // framework. type createStatsNode struct { @@ -97,49 +97,23 @@ type createStatsNode struct { // If it is false, the flow for create statistics is planned directly; this // is used when the statement is under EXPLAIN or EXPLAIN ANALYZE. runAsJob bool - - run createStatsRun -} - -// createStatsRun contains the run-time state of createStatsNode during local -// execution. -type createStatsRun struct { - resultsCh chan tree.Datums - errCh chan error } func (n *createStatsNode) startExec(params runParams) error { telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter("stats")) - n.run.resultsCh = make(chan tree.Datums) - n.run.errCh = make(chan error) - go func() { - err := n.startJob(params.ctx, n.run.resultsCh) - select { - case <-params.ctx.Done(): - case n.run.errCh <- err: - } - close(n.run.errCh) - close(n.run.resultsCh) - }() - return nil + return n.runJob(params.ctx) } func (n *createStatsNode) Next(params runParams) (bool, error) { - select { - case <-params.ctx.Done(): - return false, params.ctx.Err() - case err := <-n.run.errCh: - return false, err - case <-n.run.resultsCh: - return true, nil - } + return false, nil } func (*createStatsNode) Close(context.Context) {} func (*createStatsNode) Values() tree.Datums { return nil } -// startJob starts a CreateStats job to plan and execute statistics creation. -func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Datums) error { +// runJob starts a CreateStats job synchronously to plan and execute +// statistics creation and then waits for the job to complete. +func (n *createStatsNode) runJob(ctx context.Context) error { record, err := n.makeJobRecord(ctx) if err != nil { return err @@ -172,8 +146,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da if err := job.Start(ctx); err != nil { return err } - - if err := job.AwaitCompletion(ctx); err != nil { + if err = job.AwaitCompletion(ctx); err != nil { if errors.Is(err, stats.ConcurrentCreateStatsError) { // Delete the job so users don't see it and get confused by the error. const stmt = `DELETE FROM system.jobs WHERE id = $1` @@ -183,9 +156,8 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da log.Warningf(ctx, "failed to delete job: %v", delErr) } } - return err } - return nil + return err } // makeJobRecord creates a CreateStats job record which can be used to plan and diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index 55ea334a782e..7d1b5c2e90e6 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -29,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -36,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) // TestCreateStatsControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB @@ -122,6 +125,55 @@ func TestCreateStatsControlJob(t *testing.T) { }) } +func TestCreateStatisticsCanBeCancelled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "test can be slow to quiesce because of filter") + + var allowRequest chan struct{} + + var serverArgs base.TestServerArgs + filter, setTableID := createStatsRequestFilter(&allowRequest) + serverArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + serverArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: filter, + } + + ctx := context.Background() + tc, conn, _ := serverutils.StartServer(t, serverArgs) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(conn) + + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `CREATE TABLE d.t (x INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO d.t SELECT generate_series(1,1000)`) + var tID descpb.ID + sqlDB.QueryRow(t, `SELECT 'd.t'::regclass::int`).Scan(&tID) + setTableID(tID) + + // Run CREATE STATISTICS and wait for to create the job. + allowRequest = make(chan struct{}) + errCh := make(chan error) + go func() { + _, err := conn.Exec(`CREATE STATISTICS s1 FROM d.t`) + errCh <- err + }() + allowRequest <- struct{}{} + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%';") + var queryID string + if err := row.Scan(&queryID); err != nil { + return err + } + _, err := conn.Exec("CANCEL QUERIES VALUES ((SELECT query_id FROM [SHOW CLUSTER STATEMENTS] WHERE query LIKE 'CREATE STATISTICS%'));") + return err + }) + err := <-errCh + allowRequest <- struct{}{} + + require.ErrorContains(t, err, "pq: query execution canceled") +} + func TestAtMostOneRunningCreateStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)