diff --git a/pkg/sql/catalog/desctestutils/descriptor_test_utils.go b/pkg/sql/catalog/desctestutils/descriptor_test_utils.go index bcaace079b5a..e04b7903f5dd 100644 --- a/pkg/sql/catalog/desctestutils/descriptor_test_utils.go +++ b/pkg/sql/catalog/desctestutils/descriptor_test_utils.go @@ -165,7 +165,7 @@ func TestingGetPublicTypeDescriptor( return TestingGetTypeDescriptor(kvDB, codec, database, "public", object) } -func TestGetFunctionDescriptor( +func TestingGetFunctionDescriptor( kvDB *kv.DB, codec keys.SQLCodec, database string, schema string, fName string, ) catalog.FunctionDescriptor { db := TestingGetDatabaseDescriptor(kvDB, codec, database) diff --git a/pkg/sql/catalog/redact/redact_test.go b/pkg/sql/catalog/redact/redact_test.go index 3cf4bb53eee2..0c7e28a6f00d 100644 --- a/pkg/sql/catalog/redact/redact_test.go +++ b/pkg/sql/catalog/redact/redact_test.go @@ -65,7 +65,7 @@ $$`) }) t.Run("create function", func(t *testing.T) { - fn := desctestutils.TestGetFunctionDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "public", "f1") + fn := desctestutils.TestingGetFunctionDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "public", "f1") mut := funcdesc.NewBuilder(fn.FuncDesc()).BuildCreatedMutableFunction() require.Empty(t, redact.Redact(mut.DescriptorProto())) require.Equal(t, `SELECT k FROM defaultdb.public.kv WHERE v != '_'; SELECT k FROM defaultdb.public.kv WHERE v = '_';`, mut.FunctionBody) diff --git a/pkg/upgrade/upgrades/first_upgrade.go b/pkg/upgrade/upgrades/first_upgrade.go index 3f04b2dee86b..8750e05dba4b 100644 --- a/pkg/upgrade/upgrades/first_upgrade.go +++ b/pkg/upgrade/upgrades/first_upgrade.go @@ -12,9 +12,12 @@ package upgrades import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/upgrade" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -35,13 +38,90 @@ import ( func FirstUpgradeFromReleasePrecondition( ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, ) error { - const q = `SELECT count(*) FROM "".crdb_internal.invalid_objects AS OF SYSTEM TIME '-10s'` - row, err := d.InternalExecutor.QueryRow(ctx, "query-invalid-objects", nil /* txn */, q) - if err != nil { + // For performance reasons, we look back in time when performing + // a diagnostic query. If no corruptions were found back then, we assume that + // there are no corruptions now. Otherwise, we retry and do everything + // without an AOST clause henceforth. + withAOST := true + diagnose := func(tbl string) (hasRows bool, err error) { + q := fmt.Sprintf("SELECT count(*) FROM \"\".crdb_internal.%s", tbl) + if withAOST { + q = q + " AS OF SYSTEM TIME '-10s'" + } + row, err := d.InternalExecutor.QueryRow(ctx, "query-"+tbl, nil /* txn */, q) + if err == nil && row[0].String() != "0" { + hasRows = true + } + return hasRows, err + } + // Check for possibility of time travel. + if hasRows, err := diagnose("databases"); err != nil { + return err + } else if !hasRows { + // We're looking back in time to before the cluster was bootstrapped + // and no databases exist at that point. Disable time-travel henceforth. + withAOST = false + } + // Check for repairable catalog corruptions. + if hasRows, err := diagnose("kv_repairable_catalog_corruptions"); err != nil { return err + } else if hasRows { + // Attempt to repair catalog corruptions in batches. + log.Info(ctx, "auto-repairing catalog corruptions detected during upgrade attempt") + var n int + const repairQuery = ` +SELECT + count(*) +FROM + ( + SELECT + crdb_internal.repair_catalog_corruption(id, corruption) AS was_repaired + FROM + "".crdb_internal.kv_repairable_catalog_corruptions + LIMIT + 1000 + ) +WHERE + was_repaired` + for { + row, err := d.InternalExecutor.QueryRow( + ctx, "repair-catalog-corruptions", nil /* txn */, repairQuery, + ) + if err != nil { + return err + } + c := tree.MustBeDInt(row[0]) + if c == 0 { + break + } + n += int(c) + log.Infof(ctx, "repaired %d catalog corruptions", c) + } + if n == 0 { + log.Info(ctx, "no catalog corruptions found to repair during upgrade attempt") + } else { + // Repairs have actually been performed: stop all time travel henceforth. + withAOST = false + log.Infof(ctx, "%d catalog corruptions have been repaired in total", n) + } } - if n := row[0].String(); n != "0" { - return errors.AssertionFailedf("%s row(s) found in \"\".crdb_internal.invalid_objects, expected none", n) + // Check for all known catalog corruptions. + if hasRows, err := diagnose("invalid_objects"); err != nil { + return err + } else if !hasRows { + return nil + } + if !withAOST { + return errors.AssertionFailedf("\"\".crdb_internal.invalid_objects is not empty") + } + // At this point, corruptions were found using the AS OF SYSTEM TIME clause. + // Re-run the diagnosis without the clause, because we might not be seeing + // repairs which might have taken place recently. + withAOST = false + if hasRows, err := diagnose("invalid_objects"); err != nil { + return err + } else if !hasRows { + return nil } - return nil + return errors.AssertionFailedf("\"\".crdb_internal.invalid_objects is not empty") } diff --git a/pkg/upgrade/upgrades/first_upgrade_test.go b/pkg/upgrade/upgrades/first_upgrade_test.go index 31bd5bf5304a..c3d9cd4c6014 100644 --- a/pkg/upgrade/upgrades/first_upgrade_test.go +++ b/pkg/upgrade/upgrades/first_upgrade_test.go @@ -22,7 +22,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -70,7 +72,7 @@ func TestFirstUpgrade(t *testing.T) { "CREATE TABLE foo (i INT PRIMARY KEY, j INT, INDEX idx(j))", ) - // Corrupt the table descriptor. + // Corrupt the table descriptor in an unrecoverable manner. tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "foo") descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tbl.GetID()) require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -87,7 +89,7 @@ func TestFirstUpgrade(t *testing.T) { // Try upgrading the cluster version, precondition check should fail. const qUpgrade = "SET CLUSTER SETTING version = crdb_internal.node_executable_version()" tdb.ExpectErr( - t, `verifying precondition for version .* 1 row.* found in .*invalid_objects`, qUpgrade, + t, `verifying precondition for version .*invalid_objects is not empty`, qUpgrade, ) // Unbreak the table descriptor. @@ -102,3 +104,111 @@ func TestFirstUpgrade(t *testing.T) { // Upgrade the cluster version. tdb.Exec(t, qUpgrade) } + +// TestFirstUpgradeRepair tests the correct repair behavior of upgrade +// steps which are implicitly defined for each V[0-9]+_[0-9]+Start cluster +// version key. +func TestFirstUpgradeRepair(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var ( + v0 = clusterversion.TestingBinaryMinSupportedVersion + v1 = clusterversion.ByKey(clusterversion.BinaryVersionKey) + ) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettingsWithVersions(v1, v0, false /* initializeVersion */) + require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV)) + testServer, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: v0, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer testServer.Stopper().Stop(ctx) + + // Set up the test cluster schema. + tdb := sqlutils.MakeSQLRunner(sqlDB) + execStmts := func(t *testing.T, stmts ...string) { + for _, stmt := range stmts { + tdb.Exec(t, stmt) + } + } + + // Create a table and a function for this test. + execStmts(t, + "CREATE DATABASE test", + "USE test", + "CREATE TABLE foo (i INT PRIMARY KEY, j INT, INDEX idx(j))", + "INSERT INTO foo VALUES (1, 2)", + "CREATE FUNCTION test.public.f() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$", + ) + + // Corrupt FK back references in the test table descriptor. + codec := keys.SystemSQLCodec + tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "test", "foo") + fn := desctestutils.TestingGetFunctionDescriptor(kvDB, codec, "test", "public", "f") + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + tbl := tabledesc.NewBuilder(tbl.TableDesc()).BuildExistingMutableTable() + tbl.InboundFKs = []descpb.ForeignKeyConstraint{{ + OriginTableID: 123456789, + OriginColumnIDs: []descpb.ColumnID{1}, + ReferencedColumnIDs: tbl.PublicColumnIDs(), + ReferencedTableID: tbl.GetID(), + Name: "corrupt_fk", + Validity: descpb.ConstraintValidity_Validated, + ConstraintID: tbl.NextConstraintID, + }} + tbl.NextConstraintID++ + b.Put(catalogkeys.MakeDescMetadataKey(codec, tbl.GetID()), tbl.DescriptorProto()) + fn := funcdesc.NewBuilder(fn.FuncDesc()).BuildExistingMutableFunction() + fn.DependedOnBy = []descpb.FunctionDescriptor_Reference{{ + ID: 123456789, + ColumnIDs: []descpb.ColumnID{1}, + }} + b.Put(catalogkeys.MakeDescMetadataKey(codec, fn.GetID()), fn.DescriptorProto()) + return txn.Run(ctx, b) + })) + + // The corruption should remain undetected for DML queries. + tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2"}}) + tdb.CheckQueryResults(t, "SELECT test.public.f()", [][]string{{"1"}}) + + // The corruption should interfere with DDL statements. + const errRE = "referenced table ID 123456789: referenced descriptor not found" + tdb.ExpectErr(t, errRE, "ALTER TABLE test.public.foo RENAME TO bar") + tdb.ExpectErr(t, errRE, "ALTER FUNCTION test.public.f RENAME TO g") + + // Check that the corruption is detected by invalid_objects. + const qDetectCorruption = `SELECT count(*) FROM "".crdb_internal.invalid_objects` + tdb.CheckQueryResults(t, qDetectCorruption, [][]string{{"2"}}) + + // Check that the corruption is detected by kv_repairable_catalog_corruptions. + const qDetectRepairableCorruption = ` + SELECT count(*) FROM "".crdb_internal.kv_repairable_catalog_corruptions` + tdb.CheckQueryResults(t, qDetectRepairableCorruption, [][]string{{"2"}}) + + // Wait long enough for precondition check to be effective. + tdb.Exec(t, "CREATE DATABASE test2") + const qWaitForAOST = "SELECT count(*) FROM [SHOW DATABASES] AS OF SYSTEM TIME '-10s'" + tdb.CheckQueryResultsRetry(t, qWaitForAOST, [][]string{{"5"}}) + + // Try upgrading the cluster version. + // Precondition check should repair all corruptions and upgrade should succeed. + const qUpgrade = "SET CLUSTER SETTING version = crdb_internal.node_executable_version()" + tdb.Exec(t, qUpgrade) + tdb.CheckQueryResults(t, qDetectCorruption, [][]string{{"0"}}) + tdb.CheckQueryResults(t, qDetectRepairableCorruption, [][]string{{"0"}}) + + // Check that the table and function are OK. + tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2"}}) + tdb.Exec(t, "ALTER TABLE test.foo ADD COLUMN k INT DEFAULT 42") + tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2", "42"}}) + tdb.CheckQueryResults(t, "SELECT test.public.f()", [][]string{{"1"}}) +}