Skip to content

Commit

Permalink
backupccl: drop function if restore failed
Browse files Browse the repository at this point in the history
This commit adds logic to cleaning logic on restore failures,
so that function descriptors are dropped if restore failed.

Release note: None.
Release justification: necessary bug fix to make sure backup
and restore work with udf.
  • Loading branch information
chengxiong-ruan committed Sep 12, 2022
1 parent ef4656f commit 1e2f6da
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
70 changes: 69 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8057,6 +8057,7 @@ func TestCleanupDoesNotDeleteParentsWithChildObjects(t *testing.T) {
CREATE SCHEMA sc;
CREATE TABLE sc.tb (x INT);
CREATE TYPE sc.typ AS ENUM ('hello');
CREATE FUNCTION sc.f() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$;
`)

// Back up the database.
Expand All @@ -8076,7 +8077,8 @@ func TestCleanupDoesNotDeleteParentsWithChildObjects(t *testing.T) {
<-afterPublishNotif
// Create a table in the database we just made public for which the RESTORE
// job isn't actually finished.
sqlDB.Exec(t, `CREATE TABLE d.public.new_table()`)
sqlDB.Exec(t, `USE d;`)
sqlDB.Exec(t, `CREATE TABLE public.new_table()`)
close(continueNotif)
require.NoError(t, g.Wait())

Expand All @@ -8089,6 +8091,7 @@ func TestCleanupDoesNotDeleteParentsWithChildObjects(t *testing.T) {
{"public", "new_table"},
})
sqlDB.CheckQueryResults(t, `SHOW TYPES`, [][]string{})
sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.create_function_statements`, [][]string{})
})

t.Run("clean-up-schema-with-table", func(t *testing.T) {
Expand Down Expand Up @@ -8159,6 +8162,71 @@ func TestCleanupDoesNotDeleteParentsWithChildObjects(t *testing.T) {
{"sc", "new_table"},
})
})

t.Run("clean-up-database-with-udf", func(t *testing.T) {
ctx := context.Background()
tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanupFn()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for _, server := range tc.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.afterPublishingDescriptors = func() error {
notifyContinue(ctx)
return errors.New("injected error")
}
return r
},
}
}

sqlDB.Exec(t, `
CREATE DATABASE d;
USE d;
CREATE SCHEMA sc1;
CREATE FUNCTION sc1.f1() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$;
CREATE SCHEMA sc2;
`)

// Back up the database.
sqlDB.Exec(t, `BACKUP DATABASE d TO 'nodelocal://0/test/'`)

// Drop the database and restore into it.
sqlDB.Exec(t, `DROP DATABASE d`)

afterPublishNotif, continueNotif := notifyAfterPublishing()
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
_, err := sqlDB.DB.ExecContext(ctx, `RESTORE DATABASE d FROM 'nodelocal://0/test/'`)
require.Regexp(t, "injected error", err)
return nil
})

<-afterPublishNotif
// Create a table in the database we just made public for which the RESTORE
// job isn't actually finished.
sqlDB.Exec(t, `
USE d;
CREATE FUNCTION sc2.f2(a INT) RETURNS INT LANGUAGE SQL AS $$ SELECT a $$;`)
close(continueNotif)
require.NoError(t, g.Wait())

// Check that the restored database still exists, but only contains the new
// table we added.
sqlDB.CheckQueryResults(t, `SELECT schema_name FROM [SHOW SCHEMAS FROM d] ORDER BY 1`, [][]string{
{"crdb_internal"}, {"information_schema"}, {"pg_catalog"}, {"pg_extension"}, {"public"}, {"sc2"},
})
sqlDB.CheckQueryResults(
t,
`SELECT database_name, schema_name, function_name FROM crdb_internal.create_function_statements`,
[][]string{{"d", "sc2", "f2"}},
)
})

}

func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,6 +2339,22 @@ func (r *restoreResumer) dropDescriptors(
descsCol.NotifyOfDeletedDescriptor(mutType.GetID())
}

for i := range details.FunctionDescs {
fnDesc := details.FunctionDescs[i]
mutFn, err := descsCol.GetMutableFunctionByID(ctx, txn, fnDesc.ID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
AvoidLeased: true,
IncludeOffline: true,
},
})
if err != nil {
return err
}
mutFn.SetDropped()
b.Del(catalogkeys.MakeDescMetadataKey(codec, fnDesc.ID))
descsCol.NotifyOfDeletedDescriptor(fnDesc.ID)
}

// Queue a GC job.
gcDetails := jobspb.SchemaChangeGCDetails{}
for _, tableID := range tablesToGC {
Expand Down Expand Up @@ -2372,6 +2388,9 @@ func (r *restoreResumer) dropDescriptors(
for _, schema := range details.SchemaDescs {
ignoredChildDescIDs[schema.ID] = struct{}{}
}
for _, fn := range details.FunctionDescs {
ignoredChildDescIDs[fn.ID] = struct{}{}
}
all, err := descsCol.GetAllDescriptors(ctx, txn)
if err != nil {
return err
Expand Down

0 comments on commit 1e2f6da

Please sign in to comment.