diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 17fc3be4519a..b339007f8d52 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -882,7 +882,7 @@ func tsOrNull(micros int64) (tree.Datum, error) { } const ( - // systemJobsAndJobInfoBaseQuery consults both the `system.jobs` and + // SystemJobsAndJobInfoBaseQuery consults both the `system.jobs` and // `system.job_info` tables to return relevant information about a job. // // NB: Every job on creation writes a row each for its payload and progress to @@ -892,7 +892,7 @@ const ( // Theoretically, a job could have no rows corresponding to its progress and // so we perform a LEFT JOIN to get a NULL value when no progress row is // found. - systemJobsAndJobInfoBaseQuery = ` + SystemJobsAndJobInfoBaseQuery = ` WITH latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload'::BYTES), latestprogress AS (SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress'::BYTES) @@ -941,7 +941,7 @@ func getInternalSystemJobsQueryFromClusterVersion( ) string { var baseQuery string if version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) { - baseQuery = systemJobsAndJobInfoBaseQuery + baseQuery = SystemJobsAndJobInfoBaseQuery } else if version.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) { baseQuery = systemJobsBaseQuery } else { diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 994ed75e6ef1..a5d6bf7874e4 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -45,8 +45,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -56,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1531,3 +1534,68 @@ func TestInternalSystemJobsAccess(t *testing.T) { // Admins can see all jobs. rootDB.CheckQueryResults(t, "SELECT id FROM crdb_internal.system_jobs WHERE id IN (1,2,3) ORDER BY id", [][]string{{"1"}, {"2"}, {"3"}}) } + +// TestVirtualTableDoesntHangOnQueryCanceledError is a regression test for +// #99753 which verifies that virtual table generation doesn't hang when the +// worker goroutine returns "query canceled error". +// +// The test aims to replicate the scenario observed in #99753 as closely as +// possible. In particular, the following setup is used: +// - simulate automatic collection of table statistics for a table +// - automatic stats collection - before creating the corresponding job - +// verifies that there is no other concurrent job for the table already +// - that check is done via jobs.RunningJobExists which internally issues a +// query against crdb_internal.system_jobs virtual table +// - that virtual table is generated by issuing another "system-jobs-scan" +// internal query +// - during that "system-jobs-scan" query we're injecting the query canceled +// error (in other words, the error is injected during the generation of +// crdb_internal.system_jobs virtual table). +// +// The injection is achieved by adding a callback to DistSQLReceiver.Push which +// replaces the first piece of metadata it sees with the error. +func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // addCallback determines whether the push callback should be added. + var addCallback atomic.Bool + var numCallbacksAdded atomic.Int32 + err := cancelchecker.QueryCanceledError + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if !addCallback.Load() || strings.HasPrefix(query, sql.SystemJobsAndJobInfoBaseQuery) { + return nil + } + numCallbacksAdded.Add(1) + return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) { + if meta != nil { + *meta = execinfrapb.ProducerMetadata{} + meta.Err = err + } + } + }, + }, + }, + Insecure: true, + }}) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0 /* idx */) + sqlDB := sqlutils.MakeSQLRunner(db) + // Disable auto stats so that it doesn't interfere with the test. + sqlDB.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY) WITH (sql_stats_automatic_collection_enabled = false)") + + addCallback.Store(true) + // Collect the stats on `t` as if it was done automatically. + statsQuery := fmt.Sprintf("CREATE STATISTICS %s FROM t", jobspb.AutoStatsName) + sqlDB.ExpectErr(t, err.Error(), statsQuery) + addCallback.Store(false) + + // Sanity check that the callback was added at least once. + require.Greater(t, numCallbacksAdded.Load(), int32(0)) +}