Skip to content

Commit

Permalink
support restore dropped account from pitr (#20803)
Browse files Browse the repository at this point in the history
support restore dropped account from pitr

Approved by: @daviszhen
  • Loading branch information
YANGGMM authored Dec 19, 2024
1 parent ddb306b commit e57f0c5
Show file tree
Hide file tree
Showing 5 changed files with 477 additions and 129 deletions.
126 changes: 57 additions & 69 deletions pkg/frontend/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,12 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
}()

var (
restoreLevel tree.RestoreLevel
ts int64
pitrExist bool
sortedFkTbls []string
fkTableMap map[string]*tableInfo
restoreLevel tree.RestoreLevel
ts int64
pitrExist bool
sortedFkTbls []string
fkTableMap map[string]*tableInfo
accountRecord *accountRecord
)
// resolve timestamp
ts, err = doResolveTimeStamp(stmt.TimeStamp)
Expand All @@ -909,6 +910,9 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
dbName := string(stmt.DatabaseName)
tblName := string(stmt.TableName)

isClusterRestore := false
isNeedToCleanToDatabase := true

// restore as a txn
if err = bh.Exec(ctx, "begin;"); err != nil {
return stats, err
Expand Down Expand Up @@ -953,37 +957,61 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
restoreOtherAccount := func() (rtnErr error) {
fromAccount := string(stmt.SrcAccountName)
var (
fromAccountId uint32
toAccountId uint32
toAccountId uint32
)

if len(fromAccount) == 0 {
// using account level pitr
fromAccount = pitr.accountName
fromAccountId = uint32(pitr.accountId)
accountRecord, rtnErr = getAccountRecordByTs(ctx, ses, bh, pitrName, ts, fromAccount)
if rtnErr != nil {
return
}
if fromAccount == accountName {
// restore to the same account
toAccountId = fromAccountId
getLogger(ses.GetService()).Info("restore to the same account", zap.String("fromAccount", accountName), zap.String("toAccount", accountName))
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
// need create a new account
if rtnErr = createDroppedAccount(ctx, ses, bh, pitrName, *accountRecord); rtnErr != nil {
return
}

if toAccountId, rtnErr = getAccountId(ctx, bh, accountRecord.accountName); rtnErr != nil {
return
}
}
} else {
// restore to new account
getLogger(ses.GetService()).Info("restore to the same account", zap.String("fromAccount", fromAccount), zap.String("toAccount", accountName))
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
return
}
}
} else {
// using cluster level pitr
accountRecord, rtnErr = getAccountRecordByTs(ctx, ses, bh, pitrName, ts, fromAccount)
if rtnErr != nil {
return
}
if fromAccount == accountName {
// restore to the same account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
getLogger(ses.GetService()).Info("restore to the same account", zap.String("fromAccount", accountName), zap.String("toAccount", accountName))
toAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
// need create a new account
if rtnErr = createDroppedAccount(ctx, ses, bh, pitrName, *accountRecord); rtnErr != nil {
return
}

if toAccountId, rtnErr = getAccountId(ctx, bh, accountRecord.accountName); rtnErr != nil {
return
}
}
toAccountId = fromAccountId
} else {
// restore to new account
fromAccountId, rtnErr = getAccountId(ctx, bh, fromAccount)
if rtnErr != nil {
return rtnErr
}
getLogger(ses.GetService()).Info("restore to the same account", zap.String("fromAccount", fromAccount), zap.String("toAccount", accountName))
toAccountId, rtnErr = getAccountId(ctx, bh, accountName)
if rtnErr != nil {
return rtnErr
Expand All @@ -992,62 +1020,22 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
}

// check account exists or not
var accountExist bool
if accountExist, rtnErr = doCheckAccountExistsInPitrRestore(ctx, ses.GetService(), bh, pitrName, ts, fromAccount, ses.GetAccountId()); rtnErr != nil {
return rtnErr
}
if !accountExist {
return moerr.NewInternalErrorf(ctx, "account `%s` does not exists at timestamp: %v", fromAccount, nanoTimeFormat(ts))
}
// mock snapshot
var snapshotName string
snapshotName, rtnErr = insertSnapshotRecord(ctx, ses.GetService(), bh, pitrName, ts, uint64(fromAccountId), fromAccount)
defer func() {
deleteSnapshotRecord(ctx, ses.GetService(), bh, pitrName, snapshotName)
}()
if rtnErr != nil {
return rtnErr
}

restoreAccount := fromAccountId
// drop foreign key related tables first
rtnErr = deleteCurFkTables(ctx, ses.GetService(), bh, dbName, tblName, toAccountId)
if err != nil {
return
}

// get topo sorted tables with foreign key
sortedFkTbls, rtnErr = fkTablesTopoSort(ctx, bh, snapshotName, dbName, tblName)
if rtnErr != nil {
return
}

// get foreign key table infos
fkTableMap, rtnErr = getTableInfoMap(ctx, ses.GetService(), bh, snapshotName, dbName, tblName, sortedFkTbls)
if rtnErr != nil {
return
}

// collect views and tables during table restoration
viewMap := make(map[string]*tableInfo)

rtnErr = restoreToAccount(ctx, ses.GetService(), bh, snapshotName, toAccountId, fkTableMap, viewMap, ts, restoreAccount, false, nil)
rtnErr = restoreAccountUsingClusterSnapshotToNew(
ctx,
ses,
bh,
pitrName,
ts,
*accountRecord,
uint64(toAccountId),
nil,
isClusterRestore,
isNeedToCleanToDatabase,
)
if rtnErr != nil {
return rtnErr
}

if len(fkTableMap) > 0 {
if rtnErr = restoreTablesWithFk(ctx, ses.GetService(), bh, snapshotName, sortedFkTbls, fkTableMap, toAccountId, ts); rtnErr != nil {
return
}
}

if len(viewMap) > 0 {
if rtnErr = restoreViews(ctx, ses, bh, snapshotName, viewMap, toAccountId); rtnErr != nil {
return
}
}
// checks if the given context has been canceled.
if rtnErr = CancelCheck(ctx); rtnErr != nil {
return
}
Expand Down
Loading

0 comments on commit e57f0c5

Please sign in to comment.