Skip to content

Commit

Permalink
support restore publication db (#1.2-dev) (#16488)
Browse files Browse the repository at this point in the history
  • Loading branch information
YANGGMM authored Jun 4, 2024
1 parent ef6427d commit 1ef0439
Show file tree
Hide file tree
Showing 5 changed files with 985 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@ const (
getAccountIdAndStatusFormat = `select account_id,status from mo_catalog.mo_account where account_name = '%s';`
getPubInfoForSubFormat = `select database_name,account_list from mo_catalog.mo_pubs where pub_name = "%s";`
getDbPubCountFormat = `select count(1) from mo_catalog.mo_pubs where database_name = '%s';`
deletePubFromDatabaseFormat = `delete from mo_catalog.mo_pubs where database_name = '%s';`

fetchSqlOfSpFormat = `select body, args from mo_catalog.mo_stored_procedure where name = '%s' and db = '%s' order by proc_id;`
)
Expand Down Expand Up @@ -1888,6 +1889,14 @@ func getSqlForDbPubCount(ctx context.Context, dbName string) (string, error) {
return fmt.Sprintf(getDbPubCountFormat, dbName), nil
}

func getSqlForDeletePubFromDatabase(ctx context.Context, dbName string) (string, error) {
err := inputNameIsInvalid(ctx, dbName)
if err != nil {
return "", err
}
return fmt.Sprintf(deletePubFromDatabaseFormat, dbName), nil
}

func getSqlForCheckDatabase(ctx context.Context, dbName string) (string, error) {
err := inputNameIsInvalid(ctx, dbName)
if err != nil {
Expand Down Expand Up @@ -3408,7 +3417,7 @@ func checkSubscriptionValid(ctx context.Context, ses FeSession, createSql string
}

func isDbPublishing(ctx context.Context, dbName string, ses FeSession) (ok bool, err error) {
bh := ses.GetBackgroundExec(ctx)
bh := ses.GetShareTxnBackgroundExec(ctx, false)
defer bh.Close()
var (
sql string
Expand All @@ -3424,13 +3433,7 @@ func isDbPublishing(ctx context.Context, dbName string, ses FeSession) (ok bool,
if err != nil {
return false, err
}
err = bh.Exec(ctx, "begin;")
defer func() {
err = finishTxn(ctx, bh, err)
}()
if err != nil {
return false, err
}

bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if err != nil {
Expand Down
129 changes: 129 additions & 0 deletions pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type tableType string

const view tableType = "VIEW"

const (
PubDbName = "mo_pubs"
)

var (
insertIntoMoSnapshots = `insert into mo_catalog.mo_snapshots(
snapshot_id,
Expand All @@ -60,6 +64,10 @@ var (

restoreTableDataFmt = "insert into `%s`.`%s` SELECT * FROM `%s`.`%s` {snapshot = '%s'}"

getDbPubCountWithSnapshotFormat = `select count(1) from mo_catalog.mo_pubs {snapshot = '%s'} where database_name = '%s';`

restorePubDbDataFmt = "insert into `%s`.`%s` SELECT * FROM `%s`.`%s` {snapshot = '%s'} WHERE DATABASE_NAME = '%s'"

skipDbs = []string{"mysql", "system", "system_metrics", "mo_task", "mo_debug", "information_schema", "mo_catalog"}

needSkipTablesInMocatalog = map[string]int8{
Expand Down Expand Up @@ -98,6 +106,14 @@ var (
}
)

func getSqlForGetDbPubCountWithSnapshot(ctx context.Context, snapshot string, dbName string) (string, error) {
err := inputNameIsInvalid(ctx, dbName)
if err != nil {
return "", err
}
return fmt.Sprintf(getDbPubCountWithSnapshotFormat, snapshot, dbName), nil
}

type snapshotRecord struct {
snapshotId string
snapshotName string
Expand Down Expand Up @@ -454,6 +470,11 @@ func restoreToAccount(
continue
}

// do some op to pub database
if err := checkPubAndDropPubRecord(ctx, bh, snapshotName, dbName); err != nil {
return err
}

getLogger().Info(fmt.Sprintf("[%s]drop current exists db: %v", snapshotName, dbName))
if err = bh.Exec(toCtx, fmt.Sprintf("drop database if exists %s", dbName)); err != nil {
return
Expand Down Expand Up @@ -533,6 +554,12 @@ func restoreToDatabaseOrTable(
return
}

if !restoreToTbl {
if err = checkAndRestorePublicationRecord(ctx, bh, snapshotName, dbName, toAccountId); err != nil {
return
}
}

tableInfos, err := getTableInfos(ctx, bh, snapshotName, dbName, tblName)
if err != nil {
return
Expand Down Expand Up @@ -1156,3 +1183,105 @@ func fkTablesTopoSort(ctx context.Context, bh BackgroundExec, snapshotName strin
sortedTbls, err = g.sort()
return
}

// checkPubAndDropPubRecord checks if the database is publicated, if so, delete the publication
func checkPubAndDropPubRecord(
ctx context.Context,
bh BackgroundExec,
snapshotName string,
dbName string) (err error) {
// check if the database is publicated
sql, err := getSqlForDbPubCount(ctx, dbName)
if err != nil {
return
}
getLogger().Info(fmt.Sprintf("[%s] start to check if db '%v' is publicated, check sql: %s", snapshotName, dbName, sql))

bh.ClearExecResultSet()
if err = bh.Exec(ctx, sql); err != nil {
return
}

erArray, err := getResultSet(ctx, bh)
if err != nil {
return
}

if execResultArrayHasData(erArray) {
var pubCount int64
if pubCount, err = erArray[0].GetInt64(ctx, 0, 0); err != nil {
return
}
if pubCount > 0 {
// drop the publication
sql, err = getSqlForDeletePubFromDatabase(ctx, dbName)
if err != nil {
return
}
getLogger().Info(fmt.Sprintf("[%s] start to drop publication for db '%v', drop sql: %s", snapshotName, dbName, sql))
if err = bh.Exec(ctx, sql); err != nil {
return
}
}
}
return
}

// checkAndRestorePublicationRecord checks if the database is publicated, if so, restore the publication record
func checkAndRestorePublicationRecord(
ctx context.Context,
bh BackgroundExec,
snapshotName string,
dbName string,
toAccountId uint32) (err error) {

// check if the database is publicated
sql, err := getSqlForGetDbPubCountWithSnapshot(ctx, snapshotName, dbName)
if err != nil {
return
}

getLogger().Info(fmt.Sprintf("[%s] start to check if db '%v' is publicated, check sql: %s", snapshotName, dbName, sql))

bh.ClearExecResultSet()
if err = bh.Exec(ctx, sql); err != nil {
return
}

erArray, err := getResultSet(ctx, bh)
if err != nil {
return
}

if execResultArrayHasData(erArray) {
var pubCount int64
if pubCount, err = erArray[0].GetInt64(ctx, 0, 0); err != nil {
return
}
if pubCount > 0 {
// restore the publication record
var curAccountId uint32
curAccountId, err = defines.GetAccountId(ctx)
if err != nil {
return
}

ctx = defines.AttachAccountId(ctx, toAccountId)

// insert data
insertIntoSql := fmt.Sprintf(restorePubDbDataFmt, moCatalog, PubDbName, moCatalog, PubDbName, snapshotName, dbName)
getLogger().Info(fmt.Sprintf("[%s] start to restore db '%s' pub record, insert sql: %s", snapshotName, PubDbName, insertIntoSql))

if curAccountId == toAccountId {
if err = bh.Exec(ctx, insertIntoSql); err != nil {
return
}
} else {
if err = bh.ExecRestore(ctx, insertIntoSql, curAccountId, toAccountId); err != nil {
return
}
}
}
}
return
}
1 change: 1 addition & 0 deletions pkg/frontend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ type FeSession interface {
ExitFPrint(idx int)
SetStaticTxnId(id []byte)
GetStaticTxnId() uuid.UUID
GetShareTxnBackgroundExec(ctx context.Context, newRawBatch bool) BackgroundExec
SessionLogger
}

Expand Down
Loading

0 comments on commit 1ef0439

Please sign in to comment.