Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add a regression test for virtual table generation hang #100003

Merged
merged 1 commit into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func tsOrNull(micros int64) (tree.Datum, error) {
}

const (
// systemJobsAndJobInfoBaseQuery consults both the `system.jobs` and
// SystemJobsAndJobInfoBaseQuery consults both the `system.jobs` and
// `system.job_info` tables to return relevant information about a job.
//
// NB: Every job on creation writes a row each for its payload and progress to
Expand All @@ -892,7 +892,7 @@ const (
// Theoretically, a job could have no rows corresponding to its progress and
// so we perform a LEFT JOIN to get a NULL value when no progress row is
// found.
systemJobsAndJobInfoBaseQuery = `
SystemJobsAndJobInfoBaseQuery = `
WITH
latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload'::BYTES),
latestprogress AS (SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress'::BYTES)
Expand Down Expand Up @@ -941,7 +941,7 @@ func getInternalSystemJobsQueryFromClusterVersion(
) string {
var baseQuery string
if version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
baseQuery = systemJobsAndJobInfoBaseQuery
baseQuery = SystemJobsAndJobInfoBaseQuery
} else if version.IsActive(ctx, clusterversion.V23_1BackfillTypeColumnInJobsTable) {
baseQuery = systemJobsBaseQuery
} else {
Expand Down
68 changes: 68 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand All @@ -56,6 +58,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1531,3 +1534,68 @@ func TestInternalSystemJobsAccess(t *testing.T) {
// Admins can see all jobs.
rootDB.CheckQueryResults(t, "SELECT id FROM crdb_internal.system_jobs WHERE id IN (1,2,3) ORDER BY id", [][]string{{"1"}, {"2"}, {"3"}})
}

// TestVirtualTableDoesntHangOnQueryCanceledError is a regression test for
// #99753 which verifies that virtual table generation doesn't hang when the
// worker goroutine returns "query canceled error".
//
// The test aims to replicate the scenario observed in #99753 as closely as
// possible. In particular, the following setup is used:
// - simulate automatic collection of table statistics for a table
// - automatic stats collection - before creating the corresponding job -
// verifies that there is no other concurrent job for the table already
// - that check is done via jobs.RunningJobExists which internally issues a
// query against crdb_internal.system_jobs virtual table
// - that virtual table is generated by issuing another "system-jobs-scan"
// internal query
// - during that "system-jobs-scan" query we're injecting the query canceled
// error (in other words, the error is injected during the generation of
// crdb_internal.system_jobs virtual table).
//
// The injection is achieved by adding a callback to DistSQLReceiver.Push which
// replaces the first piece of metadata it sees with the error.
func TestVirtualTableDoesntHangOnQueryCanceledError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// addCallback determines whether the push callback should be added.
var addCallback atomic.Bool
var numCallbacksAdded atomic.Int32
err := cancelchecker.QueryCanceledError
tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DistSQLReceiverPushCallbackFactory: func(query string) func(rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if !addCallback.Load() || strings.HasPrefix(query, sql.SystemJobsAndJobInfoBaseQuery) {
return nil
}
numCallbacksAdded.Add(1)
return func(row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata) {
if meta != nil {
*meta = execinfrapb.ProducerMetadata{}
meta.Err = err
}
}
},
},
},
Insecure: true,
}})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0 /* idx */)
sqlDB := sqlutils.MakeSQLRunner(db)
// Disable auto stats so that it doesn't interfere with the test.
sqlDB.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY) WITH (sql_stats_automatic_collection_enabled = false)")

addCallback.Store(true)
// Collect the stats on `t` as if it was done automatically.
statsQuery := fmt.Sprintf("CREATE STATISTICS %s FROM t", jobspb.AutoStatsName)
sqlDB.ExpectErr(t, err.Error(), statsQuery)
addCallback.Store(false)

// Sanity check that the callback was added at least once.
require.Greater(t, numCallbacksAdded.Load(), int32(0))
}