Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86420: catalog: spin off and adopt back-reference validation level r=postamar a=postamar

Previously, forward- and backward-reference-checking was done in one
same validation phase, identified by the
catalog.ValidationLevelCrossReferences constant and a similarly named
method on catalog.Descriptor. This validation phase was exercised both
when reading and when writing descriptors.

The problem with this was that a corrupt back-reference would make
a table unusable until descriptor surgery was performed. While this
might be warranted in the context of schema changes, that's not the case
when it comes to queries, which don't really care about these
back-references. For this reason, we want to bypass back-reference
validation when reading descriptors for the purpose of serving queries.
These reads are characterized by the flag AvoidLeased being unset.

To do this, this commit splits the cross-reference validation level into
two. The back-reference level now exists to check the integrity of
back-references directly, as well as to check that forward references
also have matching back-references in the referenced descriptors.

Fixes cockroachdb#85263.

Release justification: low-risk, high benefit change
Release note: None

---- 

This PR is based on cockroachdb#87067 which is the product of what made sense for me to take action on right now regarding cockroachdb#86420 (comment) .



Co-authored-by: Marius Posta <[email protected]>
  • Loading branch information
craig[bot] and Marius Posta committed Sep 8, 2022
2 parents 07c8664 + 1a66a9b commit 01eb6ab
Show file tree
Hide file tree
Showing 28 changed files with 500 additions and 355 deletions.
223 changes: 51 additions & 172 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,49 @@ func TestDataDriven(t *testing.T) {
ds := newDatadrivenTestState()
defer ds.cleanup(ctx)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {

execWithTagAndPausePoint := func(jobType jobspb.Type) string {
const user = "root"
sqlDB := ds.getSQLDB(t, lastCreatedServer, user)
// First, run the schema change.

_, err := sqlDB.Exec(d.Input)

var jobID jobspb.JobID
{
const qFmt = `SELECT job_id FROM [SHOW JOBS] WHERE job_type = '%s' ORDER BY created DESC LIMIT 1`
errJob := sqlDB.QueryRow(fmt.Sprintf(qFmt, jobType)).Scan(&jobID)
if !errors.Is(errJob, gosql.ErrNoRows) {
require.NoError(t, errJob)
}
require.NotZerof(t, jobID, "job not found for %q: %+v", d.Input, err)
}

// Tag the job.
if d.HasArg("tag") {
var jobTag string
d.ScanArgs(t, "tag", &jobTag)
if _, exists := ds.jobTags[jobTag]; exists {
t.Fatalf("failed to `tag`, job with tag %s already exists", jobTag)
}
ds.jobTags[jobTag] = jobID
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
// Check if we are expecting a pausepoint error.
require.NotNilf(t, err, "expected pause point error")
require.Regexp(t, "pause point .* hit$", err.Error())
jobutils.WaitForJobToPause(t, sqlutils.MakeSQLRunner(sqlDB), jobID)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
}

for v := range ds.vars {
d.Input = strings.Replace(d.Input, v, ds.vars[v], -1)
d.Expected = strings.Replace(d.Expected, v, ds.vars[v], -1)
Expand Down Expand Up @@ -608,58 +651,12 @@ func TestDataDriven(t *testing.T) {
return ""

case "backup":
server := lastCreatedServer
user := "root"
jobType := "BACKUP"

// First, run the backup.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
return execWithTagAndPausePoint(jobspb.TypeBackup)

case "import":
server := lastCreatedServer
user := "root"
jobType := "IMPORT"

// First, run the backup.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
return execWithTagAndPausePoint(jobspb.TypeImport)

case "restore":
server := lastCreatedServer
user := "root"
jobType := "RESTORE"

if d.HasArg("aost") {
var aost string
d.ScanArgs(t, "aost", &aost)
Expand All @@ -673,95 +670,17 @@ func TestDataDriven(t *testing.T) {
d.Input = strings.Replace(d.Input, aost,
fmt.Sprintf("'%s'", ts), 1)
}

// First, run the restore.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if the job must be run aost.
if d.HasArg("aost") {
var aost string
d.ScanArgs(t, "aost", &aost)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
return execWithTagAndPausePoint(jobspb.TypeRestore)

case "new-schema-change":
server := lastCreatedServer
user := "root"
jobType := "NEW SCHEMA CHANGE"

// First, run the schema change.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if the job must be run aost.
if d.HasArg("aost") {
var aost string
d.ScanArgs(t, "aost", &aost)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
return execWithTagAndPausePoint(jobspb.TypeNewSchemaChange)

case "schema-change":
server := lastCreatedServer
user := "root"
jobType := "SCHEMA CHANGE"

// First, run the schema change.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if the job must be run aost.
if d.HasArg("aost") {
var aost string
d.ScanArgs(t, "aost", &aost)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""
return execWithTagAndPausePoint(jobspb.TypeSchemaChange)

case "job":
server := lastCreatedServer
user := "root"
const user = "root"

if d.HasArg("cancel") {
var cancelJobTag string
Expand Down Expand Up @@ -821,7 +740,7 @@ func TestDataDriven(t *testing.T) {

case "save-cluster-ts":
server := lastCreatedServer
user := "root"
const user = "root"
var timestampTag string
d.ScanArgs(t, "tag", &timestampTag)
if _, ok := ds.clusterTimestamps[timestampTag]; ok {
Expand Down Expand Up @@ -857,7 +776,7 @@ func TestDataDriven(t *testing.T) {

case "corrupt-backup":
server := lastCreatedServer
user := "root"
const user = "root"
var uri string
d.ScanArgs(t, "uri", &uri)
parsedURI, err := url.Parse(strings.Replace(uri, "'", "", -1))
Expand Down Expand Up @@ -923,43 +842,3 @@ func handleKVRequest(
t.Fatalf("Unknown kv request")
}
}

// findMostRecentJobWithType returns the most recently created job of `job_type`
// jobType.
func findMostRecentJobWithType(
t *testing.T, ds datadrivenTestState, server, user string, jobType string,
) jobspb.JobID {
var jobID jobspb.JobID
require.NoError(
t, ds.getSQLDB(t, server, user).QueryRow(
fmt.Sprintf(
`SELECT job_id FROM [SHOW JOBS] WHERE job_type = '%s' ORDER BY created DESC LIMIT 1`,
jobType)).Scan(&jobID))
return jobID
}

// expectPausepoint waits for the job to hit a pausepoint and enter a paused
// state.
func expectPausepoint(
t *testing.T, err error, jobType, server, user string, ds datadrivenTestState,
) {
// Check if we are expecting a pausepoint error.
require.NotNilf(t, err, "expected pause point error")

runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user))
jobutils.WaitForJobToPause(t, runner,
findMostRecentJobWithType(t, ds, server, user, jobType))
}

// tagJob stores the jobID of the most recent job of `jobType`. Users can use
// the tag to refer to the job in the future.
func tagJob(
t *testing.T, server, user, jobType string, ds datadrivenTestState, d *datadriven.TestData,
) {
var jobTag string
d.ScanArgs(t, "tag", &jobTag)
if _, exists := ds.jobTags[jobTag]; exists {
t.Fatalf("failed to `tag`, job with tag %s already exists", jobTag)
}
ds.jobTags[jobTag] = findMostRecentJobWithType(t, ds, server, user, jobType)
}
65 changes: 37 additions & 28 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,7 +2337,12 @@ func (r *restoreResumer) dropDescriptors(

// Delete any schema descriptors that this restore created. Also collect the
// descriptors so we can update their parent databases later.
dbsWithDeletedSchemas := make(map[descpb.ID][]catalog.Descriptor)
type dbWithDeletedSchemas struct {
db *dbdesc.Mutable
schemas []catalog.Descriptor
}

dbsWithDeletedSchemas := make(map[descpb.ID]dbWithDeletedSchemas)
for _, schemaDesc := range details.SchemaDescs {
// We need to ignore descriptors we just added since we haven't committed the txn that deletes these.
isSchemaEmpty, err := isSchemaEmpty(ctx, txn, schemaDesc.GetID(), allDescs, ignoredChildDescIDs)
Expand All @@ -2354,49 +2359,53 @@ func (r *restoreResumer) dropDescriptors(
if err != nil {
return err
}
entry, hasEntry := dbsWithDeletedSchemas[schemaDesc.GetParentID()]
if !hasEntry {
mutParent, err := descsCol.GetMutableDescriptorByID(ctx, txn, schemaDesc.GetParentID())
if err != nil {
return err
}
entry.db = mutParent.(*dbdesc.Mutable)
}

// Mark schema as dropped and add uncommitted version to pass pre-txn
// descriptor validation.
// Delete schema entries in descriptor and namespace system tables.
b.Del(catalogkeys.EncodeNameKey(codec, mutSchema))
b.Del(catalogkeys.MakeDescMetadataKey(codec, mutSchema.GetID()))
descsCol.NotifyOfDeletedDescriptor(mutSchema.GetID())
// Add dropped descriptor as uncommitted to satisfy descriptor validation.
mutSchema.SetDropped()
mutSchema.MaybeIncrementVersion()
if err := descsCol.AddUncommittedDescriptor(ctx, mutSchema); err != nil {
return err
}

b.Del(catalogkeys.EncodeNameKey(codec, mutSchema))
b.Del(catalogkeys.MakeDescMetadataKey(codec, mutSchema.GetID()))
descsCol.NotifyOfDeletedDescriptor(mutSchema.GetID())
dbsWithDeletedSchemas[mutSchema.GetParentID()] = append(dbsWithDeletedSchemas[mutSchema.GetParentID()], mutSchema)
// Remove the back-reference to the deleted schema in the parent database.
if schemaInfo, ok := entry.db.Schemas[schemaDesc.GetName()]; !ok {
log.Warningf(ctx, "unexpected missing schema entry for %s from db %d; skipping deletion",
schemaDesc.GetName(), entry.db.GetID())
} else if schemaInfo.ID != schemaDesc.GetID() {
log.Warningf(ctx, "unexpected schema entry %d for %s from db %d, expecting %d; skipping deletion",
schemaInfo.ID, schemaDesc.GetName(), entry.db.GetID(), schemaDesc.GetID())
} else {
delete(entry.db.Schemas, schemaDesc.GetName())
}

entry.schemas = append(entry.schemas, mutSchema)
dbsWithDeletedSchemas[entry.db.GetID()] = entry
}

// For each database that had a child schema deleted (regardless of whether
// the db was created in the restore job), if it wasn't deleted just now,
// delete the now-deleted child schema from its schema map.
// write the updated descriptor with the now-deleted child schemas from its
// schema map.
//
// This cleanup must be done prior to dropping the database descriptors in the
// loop below so that we do not accidentally `b.Put` the descriptor with the
// modified schema slice after we have issued a `b.Del` to drop it.
for dbID, schemas := range dbsWithDeletedSchemas {
log.Infof(ctx, "deleting %d schema entries from database %d", len(schemas), dbID)
desc, err := descsCol.GetMutableDescriptorByID(ctx, txn, dbID)
if err != nil {
return err
}
db := desc.(*dbdesc.Mutable)
for _, sc := range schemas {
if schemaInfo, ok := db.Schemas[sc.GetName()]; !ok {
log.Warningf(ctx, "unexpected missing schema entry for %s from db %d; skipping deletion",
sc.GetName(), dbID)
} else if schemaInfo.ID != sc.GetID() {
log.Warningf(ctx, "unexpected schema entry %d for %s from db %d, expecting %d; skipping deletion",
schemaInfo.ID, sc.GetName(), dbID, sc.GetID())
} else {
delete(db.Schemas, sc.GetName())
}
}

for dbID, entry := range dbsWithDeletedSchemas {
log.Infof(ctx, "deleting %d schema entries from database %d", len(entry.schemas), dbID)
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, db, b,
ctx, false /* kvTrace */, entry.db, b,
); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 01eb6ab

Please sign in to comment.