Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support restore dropped account from pitr #20803

Merged
merged 13 commits into from
Dec 19, 2024
126 changes: 57 additions & 69 deletions pkg/frontend/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,12 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
}()

var (
restoreLevel tree.RestoreLevel
ts int64
pitrExist bool
sortedFkTbls []string
fkTableMap map[string]*tableInfo
restoreLevel tree.RestoreLevel
ts int64
pitrExist bool
sortedFkTbls []string
fkTableMap map[string]*tableInfo
accountRecord *accountRecord
)
// resolve timestamp
ts, err = doResolveTimeStamp(stmt.TimeStamp)
Expand All @@ -909,6 +910,9 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
dbName := string(stmt.DatabaseName)
tblName := string(stmt.TableName)

isClusterRestore := false
isNeedToCleanToDatabase := true

// restore as a txn
if err = bh.Exec(ctx, "begin;"); err != nil {
return stats, err
Expand Down Expand Up @@ -953,37 +957,61 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
restoreOtherAccount := func() (rtnErr error) {
fromAccount := string(stmt.SrcAccountName)
var (
fromAccountId uint32
toAccountId uint32
toAccountId uint32
)

if len(fromAccount) == 0 {
// using account level pitr
fromAccount = pitr.accountName
fromAccountId = uint32(pitr.accountId)
accountRecord, rtnErr = getAccountRecordByTs(ctx, ses, bh, pitrName, ts, fromAccount)
if rtnErr != nil {
return
}
if fromAccount == accountName {
// restore to the same account
toAccountId = fromAccountId
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
// need create a new account
rtnErr = createDroppedAccount(ctx, ses, bh, pitrName, *accountRecord)
if rtnErr != nil {
return
}

toAccountId, rtnErr = getAccountId(ctx, bh, accountRecord.accountName)
if rtnErr != nil {
return
}
}
} else {
// restore to new account
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
return
}
}
} else {
// using cluster level pitr
accountRecord, rtnErr = getAccountRecordByTs(ctx, ses, bh, pitrName, ts, fromAccount)
if rtnErr != nil {
return
}
if fromAccount == accountName {
// restore to the same account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
toAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
// need create a new account
rtnErr = createDroppedAccount(ctx, ses, bh, pitrName, *accountRecord)
if rtnErr != nil {
return rtnErr
}

toAccountId, rtnErr = getAccountId(ctx, bh, accountRecord.accountName)
if rtnErr != nil {
return rtnErr
}
}
toAccountId = fromAccountId
} else {
// restore to new account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
}
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
Expand All @@ -992,62 +1020,22 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
}

// check account exists or not
var accountExist bool
if accountExist, rtnErr = doCheckAccountExistsInPitrRestore(ctx, ses.GetService(), bh, pitrName, ts, fromAccount, ses.GetAccountId()); rtnErr != nil {
return rtnErr
}
if !accountExist {
return moerr.NewInternalErrorf(ctx, "account `%s` does not exists at timestamp: %v", fromAccount, nanoTimeFormat(ts))
}
// mock snapshot
var snapshotName string
snapshotName, rtnErr = insertSnapshotRecord(ctx, ses.GetService(), bh, pitrName, ts, uint64(fromAccountId), fromAccount)
defer func() {
deleteSnapshotRecord(ctx, ses.GetService(), bh, pitrName, snapshotName)
}()
if rtnErr != nil {
return rtnErr
}

restoreAccount := fromAccountId
// drop foreign key related tables first
rtnErr = deleteCurFkTables(ctx, ses.GetService(), bh, dbName, tblName, toAccountId)
if err != nil {
return
}

// get topo sorted tables with foreign key
sortedFkTbls, rtnErr = fkTablesTopoSort(ctx, bh, snapshotName, dbName, tblName)
if rtnErr != nil {
return
}

// get foreign key table infos
fkTableMap, rtnErr = getTableInfoMap(ctx, ses.GetService(), bh, snapshotName, dbName, tblName, sortedFkTbls)
if rtnErr != nil {
return
}

// collect views and tables during table restoration
viewMap := make(map[string]*tableInfo)

rtnErr = restoreToAccount(ctx, ses.GetService(), bh, snapshotName, toAccountId, fkTableMap, viewMap, ts, restoreAccount, false, nil)
rtnErr = restoreAccountUsingClusterSnapshotToNew(
ctx,
ses,
bh,
pitrName,
ts,
*accountRecord,
uint64(toAccountId),
nil,
isClusterRestore,
isNeedToCleanToDatabase,
)
if rtnErr != nil {
return rtnErr
}

if len(fkTableMap) > 0 {
if rtnErr = restoreTablesWithFk(ctx, ses.GetService(), bh, snapshotName, sortedFkTbls, fkTableMap, toAccountId, ts); rtnErr != nil {
return
}
}

if len(viewMap) > 0 {
if rtnErr = restoreViews(ctx, ses, bh, snapshotName, viewMap, toAccountId); rtnErr != nil {
return
}
}
// checks if the given context has been canceled.
if rtnErr = CancelCheck(ctx); rtnErr != nil {
return
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ func restoreToCluster(ctx context.Context,
return err
}
if newAccountId != uint32(account.accountId) {
if err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, account, subDbToRestore, uint64(newAccountId), true, true); err != nil {
if err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, account, uint64(newAccountId), subDbToRestore, true, true); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func restoreToCluster(ctx context.Context,
}

// 2.0 restore droped account to new account
err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, account, subDbToRestore, uint64(newAccountId), true, false)
err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, account, uint64(newAccountId), subDbToRestore, true, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1793,7 +1793,7 @@ func restoreToAccountUsingCluster(
}
}

err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, *ar, nil, uint64(toAccountId), false, isNeedToCleanToDatabase)
err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, snapshotName, snapshotTs, *ar, uint64(toAccountId), nil, false, isNeedToCleanToDatabase)
if err != nil {
return err
}
Expand Down Expand Up @@ -1965,8 +1965,8 @@ func restoreAccountUsingClusterSnapshotToNew(ctx context.Context,
snapshotName string,
snapshotTs int64,
account accountRecord,
subDbToRestore map[string]*subDbRestoreRecord,
toAccountId uint64,
subDbToRestore map[string]*subDbRestoreRecord,
isRestoreCluster bool,
isNeedToCleanToDatabase bool,
) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ func Test_restoreAccountUsingClusterSnapshotToNew(t *testing.T) {

ctx = context.WithValue(ctx, defines.TenantIDKey{}, uint32(sysAccountID))

err := restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, "sp01", 0, accountRecord{accountName: "sys", accountId: 0}, nil, 0, false, false)
err := restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, "sp01", 0, accountRecord{accountName: "sys", accountId: 0}, 0, nil, false, false)
convey.So(err, convey.ShouldNotBeNil)

sql := "select db_name, table_name, refer_db_name, refer_table_name from mo_catalog.mo_foreign_keys"
mrs := newMrsForPitrRecord([][]interface{}{{"db1", "table1", "db2", "table2"}})
bh.sql2result[sql] = mrs

err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, "sp01", 0, accountRecord{accountName: "sys", accountId: 0}, nil, 0, false, false)
err = restoreAccountUsingClusterSnapshotToNew(ctx, ses, bh, "sp01", 0, accountRecord{accountName: "sys", accountId: 0}, 0, nil, false, false)
convey.So(err, convey.ShouldNotBeNil)
})
}
Loading