From dc17fa12247e5653cb3603ee967afe1c338db2c5 Mon Sep 17 00:00:00 2001 From: Jeff Date: Thu, 13 Jul 2023 16:00:33 +0000 Subject: [PATCH] sql: add test to reproduce crdb_internal.jobs upgrade bug This test reliably reproduces the source of flakes in #106648. Release note: None Part of: #106648 --- pkg/sql/crdb_internal_test.go | 53 +++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 2f6cb8015e38..402da909b100 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -1461,6 +1461,59 @@ func TestInternalSystemJobsTableWorksWithVersionPreV23_1BackfillTypeColumnInJobs ) } +// TestInternalSystemJobsTableWorksWithAsOfSystemTime tests that +// crdb_internal.system_jobs and crdb_internal.jobs work when the server is +// fully upgraded, but the query is run with an ASOST clause at a timestamp +// before the V23_1AddTypeColumnToJobsTable migration. +func TestInternalSystemJobsTableWorksWithAsOfSystemTime(t *testing.T) { + // TODO so there is an amazing implication here which is we can't serve AS OF + // SYSTEM TIME queries for a version that is out of the binary support + // window, otherwise the cluster may not understand the descriptors. + _ = clusterversion.V23_1BackfillTypeColumnInJobsTable + + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey), + }, + }, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(db) + + // Get a timestamp before upgrades are run + var time string + row := tdb.QueryRow(t, "SELECT current_timestamp();") + row.Scan(&time) + + // Run upgrades + finalVersion := clusterversion.ByKey(clusterversion.BinaryVersionKey).String() + tdb.Exec(t, `SET CLUSTER SETTING version = $1`, finalVersion) + + // Run queries with a system time before the upgrade + tdb.Exec(t, + fmt.Sprintf("SELECT * FROM crdb_internal.jobs AS OF SYSTEM TIME '%s'", time), + ) + tdb.Exec(t, + fmt.Sprintf("SELECT * FROM crdb_internal.system_jobs AS OF SYSTEM TIME '%s'", time), + ) + // Exercise indexes. + tdb.Exec(t, + fmt.Sprintf("SELECT * FROM crdb_internal.system_jobs AS OF SYSTEM TIME '%s' WHERE job_type = 'CHANGEFEED'", time), + ) + tdb.Exec(t, + fmt.Sprintf("SELECT * FROM crdb_internal.system_jobs AS OF SYSTEM TIME '%s' WHERE id = 0", time), + ) + tdb.Exec(t, + fmt.Sprintf("SELECT * FROM crdb_internal.system_jobs AS OF SYSTEM TIME '%s' WHERE status = 'running'", time), + ) +} + // TestCorruptPayloadError asserts that we can an error // with the correct hint when we fail to decode a payload. func TestCorruptPayloadError(t *testing.T) {