Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop the table if restore is cancelled by signature not matched #58

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package base
import (
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -14,10 +15,13 @@ import (
log "github.com/sirupsen/logrus"
)

var ErrRestoreSignatureNotMatched = xerror.NewWithoutStack(xerror.Normal, "The signature is not matched, the table already exist but with different schema")

const (
BACKUP_CHECK_DURATION = time.Second * 3
RESTORE_CHECK_DURATION = time.Second * 3
MAX_CHECK_RETRY_TIMES = 86400 // 3 day
SIGNATURE_NOT_MATCHED = "already exist but with different schema"
)

type BackupState int
Expand Down Expand Up @@ -46,7 +50,6 @@ func (s BackupState) String() string {

func formatKeyWordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"

}

func ParseBackupState(state string) BackupState {
Expand Down Expand Up @@ -534,51 +537,59 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
}

// TODO: Add TaskErrMsg
func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) {
func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, error) {
log.Debugf("check restore state %s", snapshotName)

db, err := s.Connect()
if err != nil {
return RestoreStateUnknown, err
return RestoreStateUnknown, "", err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName)

log.Debugf("check restore state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "query restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "query restore state failed")
}
defer rows.Close()

var restoreStateStr string
var restoreStatusStr string
if rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}
restoreStateStr, err = rowParser.GetString("State")
if err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}
restoreStatusStr, err = rowParser.GetString("Status")
if err != nil {
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore status failed")
}

log.Infof("check snapshot %s restore state: [%v]", snapshotName, restoreStateStr)
log.Infof("check snapshot %s restore state: [%v], restore status: %s",
snapshotName, restoreStateStr, restoreStatusStr)

return _parseRestoreState(restoreStateStr), nil
return _parseRestoreState(restoreStateStr), restoreStatusStr, nil
}
return RestoreStateUnknown, xerror.Errorf(xerror.Normal, "no restore state found")
return RestoreStateUnknown, "", xerror.Errorf(xerror.Normal, "no restore state found")
}

func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if backupState, err := s.checkRestoreFinished(snapshotName); err != nil {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
return false, err
} else if backupState == RestoreStateFinished {
} else if restoreState == RestoreStateFinished {
return true, nil
} else if backupState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled, spec: %s, snapshot: %s", s.String(), snapshotName)
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
time.Sleep(RESTORE_CHECK_DURATION)
Expand All @@ -589,6 +600,34 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
return false, nil
}

func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) {
log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
return "", err
} else if restoreState == RestoreStateFinished {
return "", nil
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
pattern := regexp.MustCompile("Table (?P<tableName>.*) already exist but with different schema")
matches := pattern.FindStringSubmatch(status)
index := pattern.SubexpIndex("tableName")
if len(matches) < index && len(matches[index]) == 0 {
return "", xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
}
return matches[index], nil
} else if restoreState == RestoreStateCancelled {
return "", xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
time.Sleep(RESTORE_CHECK_DURATION)
}
}

log.Warnf("get restore signature not matched timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName)
return "", nil
}

func (s *Spec) waitTransactionDone(txnId int64) error {
db, err := s.Connect()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Specer interface {
CheckTableExists() (bool, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error)
WaitTransactionDone(txnId int64) // busy wait

Exec(sql string) error
Expand Down
23 changes: 22 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,28 @@ func (j *Job) fullSync() error {

for {
restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName)
if err != nil {
if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) {
// We need rebuild the exists table.
var tableName string
if j.SyncType == TableSync {
tableName = j.Dest.Table
} else {
tableName, err = j.IDest.GetRestoreSignatureNotMatchedTable(restoreSnapshotName)
if err != nil || len(tableName) == 0 {
continue
}
}
log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName)
for {
dropSql := fmt.Sprintf("DROP TABLE %s FORCE", tableName)
log.Infof("drop table sql: %s", dropSql)
if err := j.destMeta.DbExec(dropSql); err == nil {
break
}
}
log.Infof("the restore is cancelled, the unmatched table %s is dropped, restore snapshot again", tableName)
break
} else if err != nil {
return err
}

Expand Down