Skip to content

Commit

Permalink
upgrades: try to repair catalog corruptions during upgrade
Browse files Browse the repository at this point in the history
This change enhances the recently-added upgrade precondition check which
checks for catalog corruptions by trying to repair these automatically
when possible.

Fixes: #104425
Fixes: #85265

Release note (general change): upgrading the cluster version to a new
release will not only check for descriptor- and other catalog
corruptions but will attempt to repair some of them on a best-effort
basis. This should seamlessly get rid of all longstanding descriptor
back-reference corruptions, which typically don't manifest themselves
until a schema change or an upgrade are performed.
  • Loading branch information
Marius Posta committed Jul 6, 2023
1 parent e631ce8 commit 7a6036d
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/catalog/desctestutils/descriptor_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestingGetPublicTypeDescriptor(
return TestingGetTypeDescriptor(kvDB, codec, database, "public", object)
}

func TestGetFunctionDescriptor(
func TestingGetFunctionDescriptor(
kvDB *kv.DB, codec keys.SQLCodec, database string, schema string, fName string,
) catalog.FunctionDescriptor {
db := TestingGetDatabaseDescriptor(kvDB, codec, database)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/redact/redact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ $$`)
})

t.Run("create function", func(t *testing.T) {
fn := desctestutils.TestGetFunctionDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "public", "f1")
fn := desctestutils.TestingGetFunctionDescriptor(kvDB, keys.SystemSQLCodec, "defaultdb", "public", "f1")
mut := funcdesc.NewBuilder(fn.FuncDesc()).BuildCreatedMutableFunction()
require.Empty(t, redact.Redact(mut.DescriptorProto()))
require.Equal(t, `SELECT k FROM defaultdb.public.kv WHERE v != '_'; SELECT k FROM defaultdb.public.kv WHERE v = '_';`, mut.FunctionBody)
Expand Down
92 changes: 86 additions & 6 deletions pkg/upgrade/upgrades/first_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ package upgrades

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand All @@ -35,13 +38,90 @@ import (
func FirstUpgradeFromReleasePrecondition(
ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps,
) error {
const q = `SELECT count(*) FROM "".crdb_internal.invalid_objects AS OF SYSTEM TIME '-10s'`
row, err := d.InternalExecutor.QueryRow(ctx, "query-invalid-objects", nil /* txn */, q)
if err != nil {
// For performance reasons, we look back in time when performing
// a diagnostic query. If no corruptions were found back then, we assume that
// there are no corruptions now. Otherwise, we retry and do everything
// without an AOST clause henceforth.
withAOST := true
diagnose := func(tbl string) (hasRows bool, err error) {
q := fmt.Sprintf("SELECT count(*) FROM \"\".crdb_internal.%s", tbl)
if withAOST {
q = q + " AS OF SYSTEM TIME '-10s'"
}
row, err := d.InternalExecutor.QueryRow(ctx, "query-"+tbl, nil /* txn */, q)
if err == nil && row[0].String() != "0" {
hasRows = true
}
return hasRows, err
}
// Check for possibility of time travel.
if hasRows, err := diagnose("databases"); err != nil {
return err
} else if !hasRows {
// We're looking back in time to before the cluster was bootstrapped
// and no databases exist at that point. Disable time-travel henceforth.
withAOST = false
}
// Check for repairable catalog corruptions.
if hasRows, err := diagnose("kv_repairable_catalog_corruptions"); err != nil {
return err
} else if hasRows {
// Attempt to repair catalog corruptions in batches.
log.Info(ctx, "auto-repairing catalog corruptions detected during upgrade attempt")
var n int
const repairQuery = `
SELECT
count(*)
FROM
(
SELECT
crdb_internal.repair_catalog_corruption(id, corruption) AS was_repaired
FROM
"".crdb_internal.kv_repairable_catalog_corruptions
LIMIT
1000
)
WHERE
was_repaired`
for {
row, err := d.InternalExecutor.QueryRow(
ctx, "repair-catalog-corruptions", nil /* txn */, repairQuery,
)
if err != nil {
return err
}
c := tree.MustBeDInt(row[0])
if c == 0 {
break
}
n += int(c)
log.Infof(ctx, "repaired %d catalog corruptions", c)
}
if n == 0 {
log.Info(ctx, "no catalog corruptions found to repair during upgrade attempt")
} else {
// Repairs have actually been performed: stop all time travel henceforth.
withAOST = false
log.Infof(ctx, "%d catalog corruptions have been repaired in total", n)
}
}
if n := row[0].String(); n != "0" {
return errors.AssertionFailedf("%s row(s) found in \"\".crdb_internal.invalid_objects, expected none", n)
// Check for all known catalog corruptions.
if hasRows, err := diagnose("invalid_objects"); err != nil {
return err
} else if !hasRows {
return nil
}
if !withAOST {
return errors.AssertionFailedf("\"\".crdb_internal.invalid_objects is not empty")
}
// At this point, corruptions were found using the AS OF SYSTEM TIME clause.
// Re-run the diagnosis without the clause, because we might not be seeing
// repairs which might have taken place recently.
withAOST = false
if hasRows, err := diagnose("invalid_objects"); err != nil {
return err
} else if !hasRows {
return nil
}
return nil
return errors.AssertionFailedf("\"\".crdb_internal.invalid_objects is not empty")
}
114 changes: 112 additions & 2 deletions pkg/upgrade/upgrades/first_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -70,7 +72,7 @@ func TestFirstUpgrade(t *testing.T) {
"CREATE TABLE foo (i INT PRIMARY KEY, j INT, INDEX idx(j))",
)

// Corrupt the table descriptor.
// Corrupt the table descriptor in an unrecoverable manner.
tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "foo")
descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tbl.GetID())
require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -87,7 +89,7 @@ func TestFirstUpgrade(t *testing.T) {
// Try upgrading the cluster version, precondition check should fail.
const qUpgrade = "SET CLUSTER SETTING version = crdb_internal.node_executable_version()"
tdb.ExpectErr(
t, `verifying precondition for version .* 1 row.* found in .*invalid_objects`, qUpgrade,
t, `verifying precondition for version .*invalid_objects is not empty`, qUpgrade,
)

// Unbreak the table descriptor.
Expand All @@ -102,3 +104,111 @@ func TestFirstUpgrade(t *testing.T) {
// Upgrade the cluster version.
tdb.Exec(t, qUpgrade)
}

// TestFirstUpgradeRepair tests the correct repair behavior of upgrade
// steps which are implicitly defined for each V[0-9]+_[0-9]+Start cluster
// version key.
func TestFirstUpgradeRepair(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
v0 = clusterversion.TestingBinaryMinSupportedVersion
v1 = clusterversion.ByKey(clusterversion.BinaryVersionKey)
)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(v1, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))
testServer, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: v0,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer testServer.Stopper().Stop(ctx)

// Set up the test cluster schema.
tdb := sqlutils.MakeSQLRunner(sqlDB)
execStmts := func(t *testing.T, stmts ...string) {
for _, stmt := range stmts {
tdb.Exec(t, stmt)
}
}

// Create a table and a function for this test.
execStmts(t,
"CREATE DATABASE test",
"USE test",
"CREATE TABLE foo (i INT PRIMARY KEY, j INT, INDEX idx(j))",
"INSERT INTO foo VALUES (1, 2)",
"CREATE FUNCTION test.public.f() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$",
)

// Corrupt FK back references in the test table descriptor.
codec := keys.SystemSQLCodec
tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "test", "foo")
fn := desctestutils.TestingGetFunctionDescriptor(kvDB, codec, "test", "public", "f")
require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
tbl := tabledesc.NewBuilder(tbl.TableDesc()).BuildExistingMutableTable()
tbl.InboundFKs = []descpb.ForeignKeyConstraint{{
OriginTableID: 123456789,
OriginColumnIDs: []descpb.ColumnID{1},
ReferencedColumnIDs: tbl.PublicColumnIDs(),
ReferencedTableID: tbl.GetID(),
Name: "corrupt_fk",
Validity: descpb.ConstraintValidity_Validated,
ConstraintID: tbl.NextConstraintID,
}}
tbl.NextConstraintID++
b.Put(catalogkeys.MakeDescMetadataKey(codec, tbl.GetID()), tbl.DescriptorProto())
fn := funcdesc.NewBuilder(fn.FuncDesc()).BuildExistingMutableFunction()
fn.DependedOnBy = []descpb.FunctionDescriptor_Reference{{
ID: 123456789,
ColumnIDs: []descpb.ColumnID{1},
}}
b.Put(catalogkeys.MakeDescMetadataKey(codec, fn.GetID()), fn.DescriptorProto())
return txn.Run(ctx, b)
}))

// The corruption should remain undetected for DML queries.
tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2"}})
tdb.CheckQueryResults(t, "SELECT test.public.f()", [][]string{{"1"}})

// The corruption should interfere with DDL statements.
const errRE = "referenced table ID 123456789: referenced descriptor not found"
tdb.ExpectErr(t, errRE, "ALTER TABLE test.public.foo RENAME TO bar")
tdb.ExpectErr(t, errRE, "ALTER FUNCTION test.public.f RENAME TO g")

// Check that the corruption is detected by invalid_objects.
const qDetectCorruption = `SELECT count(*) FROM "".crdb_internal.invalid_objects`
tdb.CheckQueryResults(t, qDetectCorruption, [][]string{{"2"}})

// Check that the corruption is detected by kv_repairable_catalog_corruptions.
const qDetectRepairableCorruption = `
SELECT count(*) FROM "".crdb_internal.kv_repairable_catalog_corruptions`
tdb.CheckQueryResults(t, qDetectRepairableCorruption, [][]string{{"2"}})

// Wait long enough for precondition check to be effective.
tdb.Exec(t, "CREATE DATABASE test2")
const qWaitForAOST = "SELECT count(*) FROM [SHOW DATABASES] AS OF SYSTEM TIME '-10s'"
tdb.CheckQueryResultsRetry(t, qWaitForAOST, [][]string{{"5"}})

// Try upgrading the cluster version.
// Precondition check should repair all corruptions and upgrade should succeed.
const qUpgrade = "SET CLUSTER SETTING version = crdb_internal.node_executable_version()"
tdb.Exec(t, qUpgrade)
tdb.CheckQueryResults(t, qDetectCorruption, [][]string{{"0"}})
tdb.CheckQueryResults(t, qDetectRepairableCorruption, [][]string{{"0"}})

// Check that the table and function are OK.
tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2"}})
tdb.Exec(t, "ALTER TABLE test.foo ADD COLUMN k INT DEFAULT 42")
tdb.CheckQueryResults(t, "SELECT * FROM test.public.foo", [][]string{{"1", "2", "42"}})
tdb.CheckQueryResults(t, "SELECT test.public.f()", [][]string{{"1"}})
}

0 comments on commit 7a6036d

Please sign in to comment.