diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 0c2b7260..4bfa4652 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -3,6 +3,7 @@ package base import ( "database/sql" "fmt" + "regexp" "strconv" "strings" "time" @@ -14,10 +15,13 @@ import ( log "github.com/sirupsen/logrus" ) +var ErrRestoreSignatureNotMatched = xerror.NewWithoutStack(xerror.Normal, "The signature is not matched, the table already exist but with different schema") + const ( BACKUP_CHECK_DURATION = time.Second * 3 RESTORE_CHECK_DURATION = time.Second * 3 MAX_CHECK_RETRY_TIMES = 86400 // 3 day + SIGNATURE_NOT_MATCHED = "already exist but with different schema" ) type BackupState int @@ -46,7 +50,6 @@ func (s BackupState) String() string { func formatKeyWordName(name string) string { return "`" + strings.TrimSpace(name) + "`" - } func ParseBackupState(state string) BackupState { @@ -534,12 +537,12 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) { } // TODO: Add TaskErrMsg -func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) { +func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, error) { log.Debugf("check restore state %s", snapshotName) db, err := s.Connect() if err != nil { - return RestoreStateUnknown, err + return RestoreStateUnknown, "", err } query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName) @@ -547,38 +550,46 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) { log.Debugf("check restore state sql: %s", query) rows, err := db.Query(query) if err != nil { - return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "query restore state failed") + return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "query restore state failed") } defer rows.Close() var restoreStateStr string + var restoreStatusStr string if rows.Next() { rowParser := utils.NewRowParser() if err := rowParser.Parse(rows); err != nil { - return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed") + return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed") } restoreStateStr, err = rowParser.GetString("State") if err != nil { - return RestoreStateUnknown, xerror.Wrap(err, xerror.Normal, "scan restore state failed") + return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore state failed") + } + restoreStatusStr, err = rowParser.GetString("Status") + if err != nil { + return RestoreStateUnknown, "", xerror.Wrap(err, xerror.Normal, "scan restore status failed") } - log.Infof("check snapshot %s restore state: [%v]", snapshotName, restoreStateStr) + log.Infof("check snapshot %s restore state: [%v], restore status: %s", + snapshotName, restoreStateStr, restoreStatusStr) - return _parseRestoreState(restoreStateStr), nil + return _parseRestoreState(restoreStateStr), restoreStatusStr, nil } - return RestoreStateUnknown, xerror.Errorf(xerror.Normal, "no restore state found") + return RestoreStateUnknown, "", xerror.Errorf(xerror.Normal, "no restore state found") } func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { - if backupState, err := s.checkRestoreFinished(snapshotName); err != nil { + if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { return false, err - } else if backupState == RestoreStateFinished { + } else if restoreState == RestoreStateFinished { return true, nil - } else if backupState == RestoreStateCancelled { - return false, xerror.Errorf(xerror.Normal, "backup failed or canceled, spec: %s, snapshot: %s", s.String(), snapshotName) + } else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { + return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + } else if restoreState == RestoreStateCancelled { + return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) } else { // RestoreStatePending, RestoreStateUnknown time.Sleep(RESTORE_CHECK_DURATION) @@ -589,6 +600,34 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { return false, nil } +func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) { + log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName) + + for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { + if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { + return "", err + } else if restoreState == RestoreStateFinished { + return "", nil + } else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { + pattern := regexp.MustCompile("Table (?P.*) already exist but with different schema") + matches := pattern.FindStringSubmatch(status) + index := pattern.SubexpIndex("tableName") + if len(matches) < index && len(matches[index]) == 0 { + return "", xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + } + return matches[index], nil + } else if restoreState == RestoreStateCancelled { + return "", xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) + } else { + // RestoreStatePending, RestoreStateUnknown + time.Sleep(RESTORE_CHECK_DURATION) + } + } + + log.Warnf("get restore signature not matched timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName) + return "", nil +} + func (s *Spec) waitTransactionDone(txnId int64) error { db, err := s.Connect() if err != nil { diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index e81e9009..ff24b71a 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -28,6 +28,7 @@ type Specer interface { CheckTableExists() (bool, error) CreateSnapshotAndWaitForDone(tables []string) (string, error) CheckRestoreFinished(snapshotName string) (bool, error) + GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) WaitTransactionDone(txnId int64) // busy wait Exec(sql string) error diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 773afb62..7f3d4902 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -442,7 +442,28 @@ func (j *Job) fullSync() error { for { restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) - if err != nil { + if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) { + // We need rebuild the exists table. + var tableName string + if j.SyncType == TableSync { + tableName = j.Dest.Table + } else { + tableName, err = j.IDest.GetRestoreSignatureNotMatchedTable(restoreSnapshotName) + if err != nil || len(tableName) == 0 { + continue + } + } + log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName) + for { + dropSql := fmt.Sprintf("DROP TABLE %s FORCE", tableName) + log.Infof("drop table sql: %s", dropSql) + if err := j.destMeta.DbExec(dropSql); err == nil { + break + } + } + log.Infof("the restore is cancelled, the unmatched table %s is dropped, restore snapshot again", tableName) + break + } else if err != nil { return err }