Skip to content

Commit

Permalink
Merge branch 'dev' into ccr_tb_to_tb
Browse files Browse the repository at this point in the history
  • Loading branch information
wyxxxcat committed Nov 7, 2024
2 parents 2fd86b3 + a134ed7 commit ae4b1de
Show file tree
Hide file tree
Showing 72 changed files with 616 additions and 408 deletions.
112 changes: 49 additions & 63 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,64 +628,57 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) {
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON ( src_1 ) PROPERTIES ("type" = "full");
func (s *Spec) CreateSnapshot(tables []string) (string, error) {
func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
if tables == nil {
tables = make([]string, 0)
}
if len(tables) == 0 {
tables = append(tables, s.Table)
}

var snapshotName string
var tableRefs string
if len(tables) == 1 {
// snapshot name format "ccrs_${table}_${timestamp}"
// table refs = table
snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRefs = utils.FormatKeywordName(tables[0])
} else {
// snapshot name format "ccrs_${db}_${timestamp}"
// table refs = tables.join(", ")
snapshotName = fmt.Sprintf("ccrs_%s_%d", s.Database, time.Now().Unix())
tableRefs = "`" + strings.Join(tables, "`,`") + "`"
}

// means source is a empty db, table number is 0
if tableRefs == "``" {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

db, err := s.Connect()
if err != nil {
return "", err
return err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(snapshotName), tableRefs)
log.Infof("create snapshot %s.%s, backup snapshot sql: %s", s.Database, snapshotName, backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
return "", xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
return xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

return snapshotName, nil
return nil
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string, error) {
func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []string) error {
if len(table) == 0 {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

// snapshot name format "ccrp_${table}_${timestamp}"
// table refs = table
snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRef := utils.FormatKeywordName(table)

log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)

db, err := s.Connect()
if err != nil {
return "", err
return err
}

partitionRefs := ""
Expand All @@ -698,10 +691,10 @@ func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string,
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
return xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

return snapshotName, nil
return nil
}

// TODO: Add TaskErrMsg
Expand Down Expand Up @@ -757,103 +750,102 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
}
}

func (s *Spec) CancelBackupIfExists() error {
log.Debugf("cancel backup job if exists, database: %s", s.Database)
// Get the valid (running or finished) backup job with a unique prefix to indicate
// if a backup job needs to be issued again.
func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) {
log.Debugf("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix)

db, err := s.Connect()
if err != nil {
return err
return "", err
}

query := fmt.Sprintf("SHOW BACKUP FROM %s", utils.FormatKeywordName(s.Database))
query := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName LIKE \"%s%%\"",
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
log.Infof("show backup state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return xerror.Wrap(err, xerror.Normal, "query backup state failed")
return "", xerror.Wrap(err, xerror.Normal, "query backup state failed")
}
defer rows.Close()

labels := make([]string, 0)
for rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return xerror.Wrap(err, xerror.Normal, "scan backup state failed")
return "", xerror.Wrap(err, xerror.Normal, "scan backup state failed")
}

info, err := parseBackupInfo(rowParser)
if err != nil {
return xerror.Wrap(err, xerror.Normal, "scan backup state failed")
return "", xerror.Wrap(err, xerror.Normal, "scan backup state failed")
}

log.Infof("check snapshot %s backup state [%v], create time: %s",
info.SnapshotName, info.StateStr, info.CreateTime)

// Only cancel the running backup job issued by syncer
if !isSyncerIssuedJob(info.SnapshotName, s.Database) {
if info.State == BackupStateCancelled {
continue
}

if info.State == BackupStateFinished || info.State == BackupStateCancelled {
continue
}
labels = append(labels, info.SnapshotName)
}

cancelSql := fmt.Sprintf("CANCEL BACKUP FROM %s", s.Database)
log.Infof("cancel backup sql: %s, snapshot: %s", cancelSql, info.SnapshotName)
if _, err = db.Exec(cancelSql); err != nil {
return xerror.Wrapf(err, xerror.Normal,
"cancel backup job %s failed, database: %s", info.SnapshotName, s.Database)
}
// Return the last one. Assume that the result of `SHOW BACKUP` is ordered by CreateTime in ascending order.
if len(labels) != 0 {
return labels[len(labels)-1], nil
}
return nil

return "", nil
}

func (s *Spec) CancelRestoreIfExists(srcDbName string) error {
log.Debugf("cancel restore job if exists, src db: %s", srcDbName)
// Get the valid (running or finished) restore job with a unique prefix to indicate
// if a restore job needs to be issued again.
func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) {
log.Debugf("get valid restore job if exists, label prefix: %s", snapshotNamePrefix)

db, err := s.Connect()
if err != nil {
return err
return "", err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s", utils.FormatKeywordName(s.Database))
log.Debugf("show restore state sql: %s", query)
query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label LIKE \"%s%%\"",
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
log.Infof("show restore state sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return xerror.Wrap(err, xerror.Normal, "query restore state failed")
return "", xerror.Wrap(err, xerror.Normal, "query restore state failed")
}
defer rows.Close()

labels := make([]string, 0)
for rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}

info, err := parseRestoreInfo(rowParser)
if err != nil {
return xerror.Wrap(err, xerror.Normal, "scan restore state failed")
return "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
}

log.Infof("check snapshot %s restore state: [%v], create time: %s",
info.Label, info.StateStr, info.CreateTime)

// Only cancel the running restore job issued by syncer
if !isSyncerIssuedJob(info.Label, srcDbName) {
if info.State == RestoreStateFinished {
continue
}

if info.State == RestoreStateCancelled || info.State == RestoreStateFinished {
continue
}

cancelSql := fmt.Sprintf("CANCEL RESTORE FROM %s", utils.FormatKeywordName(s.Database))
log.Infof("cancel restore sql: %s, running snapshot %s", cancelSql, info.Label)
labels = append(labels, info.Label)
}

_, err = db.Exec(cancelSql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "cancel running restore failed, snapshot %s", info.Label)
}
// Return the last one. Assume that the result of `SHOW BACKUP` is ordered by CreateTime in ascending order.
if len(labels) != 0 {
return labels[len(labels)-1], nil
}
return nil

return "", nil
}

// TODO: Add TaskErrMsg
Expand Down Expand Up @@ -1240,9 +1232,3 @@ func correctAddPartitionSql(addPartitionSql string, addPartition *record.AddPart
}
return addPartitionSql
}

func isSyncerIssuedJob(label, dbName string) bool {
fullSyncPrefix := fmt.Sprintf("ccrs_%s", dbName)
partialSyncPrefix := fmt.Sprintf("ccrp_%s", dbName)
return strings.HasPrefix(label, fullSyncPrefix) || strings.HasPrefix(label, partialSyncPrefix)
}
8 changes: 4 additions & 4 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type Specer interface {
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CheckTableExistsByName(tableName string) (bool, error)
CreatePartialSnapshot(table string, partitions []string) (string, error)
CreateSnapshot(tables []string) (string, error)
GetValidBackupJob(snapshotNamePrefix string) (string, error)
GetValidRestoreJob(snapshotNamePrefix string) (string, error)
CreatePartialSnapshot(snapshotName, table string, partitions []string) error
CreateSnapshot(snapshotName string, tables []string) error
CheckBackupFinished(snapshotName string) (bool, error)
CancelBackupIfExists() error
CancelRestoreIfExists(srcDbName string) error
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
WaitTransactionDone(txnId int64) // busy wait
Expand Down
Loading

0 comments on commit ae4b1de

Please sign in to comment.