Skip to content

Commit

Permalink
Drop the table if restore is cancelled by signature not matched (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Apr 16, 2024
1 parent 9204724 commit b6bffd3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 14 deletions.
65 changes: 52 additions & 13 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package base
import (
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -14,10 +15,13 @@ import (
log "github.com/sirupsen/logrus"
)

var ErrRestoreSignatureNotMatched = xerror.NewWithoutStack(xerror.Normal, "The signature is not matched, the table already exist but with different schema")

const (
BACKUP_CHECK_DURATION = time.Second * 3
RESTORE_CHECK_DURATION = time.Second * 3
MAX_CHECK_RETRY_TIMES = 86400 // 3 day
SIGNATURE_NOT_MATCHED = "already exist but with different schema"
)

type BackupState int
Expand Down Expand Up @@ -46,7 +50,6 @@ func (s BackupState) String() string {

func formatKeyWordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"

}

func ParseBackupState(state string) BackupState {
Expand Down Expand Up @@ -534,51 +537,59 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
}

// TODO: Add TaskErrMsg
func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) {
func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, error) {
log.Debugf("check restore state %s", snapshotName)

db, err := s.Connect()
if err != nil {
return RestoreStateUnknown, err
return RestoreStateUnknown, "", err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName)

log.Debugf("check restore state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "query restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "query restore state failed")
}
defer rows.Close()

var restoreStateStr string
var restoreStatusStr string
if rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}
restoreStateStr, err = rowParser.GetString("State")
if err != nil {
return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}
restoreStatusStr, err = rowParser.GetString("Status")
if err != nil {
return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore status failed")
}

log.Infof("check snapshot %s restore state: [%v]", snapshotName, restoreStateStr)
log.Infof("check snapshot %s restore state: [%v], restore status: %s",
snapshotName, restoreStateStr, restoreStatusStr)

return _parseRestoreState(restoreStateStr), nil
return _parseRestoreState(restoreStateStr), restoreStatusStr, nil
}
return RestoreStateUnknown, xerror.Errorf(xerror.Normal, "no restore state found")
return RestoreStateUnknown, "", xerror.Errorf(xerror.Normal, "no restore state found")
}

func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if backupState, err := s.checkRestoreFinished(snapshotName); err != nil {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
return false, err
} else if backupState == RestoreStateFinished {
} else if restoreState == RestoreStateFinished {
return true, nil
} else if backupState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled, spec: %s, snapshot: %s", s.String(), snapshotName)
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
time.Sleep(RESTORE_CHECK_DURATION)
Expand All @@ -589,6 +600,34 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
return false, nil
}

func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) {
log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
return "", err
} else if restoreState == RestoreStateFinished {
return "", nil
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
pattern := regexp.MustCompile("Table (?P<tableName>.*) already exist but with different schema")
matches := pattern.FindStringSubmatch(status)
index := pattern.SubexpIndex("tableName")
if len(matches) < index && len(matches[index]) == 0 {
return "", xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
}
return matches[index], nil
} else if restoreState == RestoreStateCancelled {
return "", xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
time.Sleep(RESTORE_CHECK_DURATION)
}
}

log.Warnf("get restore signature not matched timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName)
return "", nil
}

func (s *Spec) waitTransactionDone(txnId int64) error {
db, err := s.Connect()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Specer interface {
CheckTableExists() (bool, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error)
WaitTransactionDone(txnId int64) // busy wait

Exec(sql string) error
Expand Down
23 changes: 22 additions & 1 deletion pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,28 @@ func (j *Job) fullSync() error {

for {
restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName)
if err != nil {
if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) {
// We need rebuild the exists table.
var tableName string
if j.SyncType == TableSync {
tableName = j.Dest.Table
} else {
tableName, err = j.IDest.GetRestoreSignatureNotMatchedTable(restoreSnapshotName)
if err != nil || len(tableName) == 0 {
continue
}
}
log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName)
for {
dropSql := fmt.Sprintf("DROP TABLE %s FORCE", tableName)
log.Infof("drop table sql: %s", dropSql)
if err := j.destMeta.DbExec(dropSql); err == nil {
break
}
}
log.Infof("the restore is cancelled, the unmatched table %s is dropped, restore snapshot again", tableName)
break
} else if err != nil {
return err
}

Expand Down

0 comments on commit b6bffd3

Please sign in to comment.