Skip to content

Commit

Permalink
Restore snapshot with clean_restore during fullsync
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Aug 8, 2024
1 parent a94d5fa commit a17ef30
Show file tree
Hide file tree
Showing 18 changed files with 3,702 additions and 129 deletions.
6 changes: 4 additions & 2 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ func (j *Job) partialSync() error {
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)
cleanRestore := false // DO NOT drop exists tables and partitions
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanRestore)
if err != nil {
return err
}
Expand Down Expand Up @@ -608,7 +609,8 @@ func (j *Job) fullSync() error {
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)
cleanRestore := true // drop exists tables and partitions.
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanRestore)
if err != nil {
return err
}
Expand Down
27 changes: 14 additions & 13 deletions pkg/rpc/fe.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type IFeRpc interface {
GetBinlog(*base.Spec, int64) (*festruct.TGetBinlogResult_, error)
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
GetSnapshot(*base.Spec, string) (*festruct.TGetSnapshotResult_, error)
RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error)
RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_, bool) (*festruct.TRestoreSnapshotResult_, error)
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
GetTableMeta(spec *base.Spec, tableIds []int64) (*festruct.TGetMetaResult_, error)
Expand Down Expand Up @@ -384,10 +384,10 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string) (*festruct.TGet
return convertResult[festruct.TGetSnapshotResult_](result, err)
}

func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error) {
func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanRestore bool) (*festruct.TRestoreSnapshotResult_, error) {
// return rpc.masterClient.RestoreSnapshot(spec, tableRefs, label, snapshotResult)
caller := func(client IFeRpc) (resultType, error) {
return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult)
return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult, cleanRestore)
}
result, err := rpc.callWithMasterRedirect(caller)
return convertResult[festruct.TRestoreSnapshotResult_](result, err)
Expand Down Expand Up @@ -664,7 +664,7 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string) (*fest
// }
//
// Restore Snapshot rpc
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error) {
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanRestore bool) (*festruct.TRestoreSnapshotResult_, error) {
// NOTE: ignore meta, because it's too large
log.Debugf("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec)

Expand All @@ -673,19 +673,20 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruc
properties := make(map[string]string)
properties["reserve_replica"] = "true"
req := &festruct.TRestoreSnapshotRequest{
Table: &spec.Table,
LabelName: &label,
RepoName: &repoName,
TableRefs: tableRefs,
Properties: properties,
Meta: snapshotResult.GetMeta(),
JobInfo: snapshotResult.GetJobInfo(),
Table: &spec.Table,
LabelName: &label,
RepoName: &repoName,
TableRefs: tableRefs,
Properties: properties,
Meta: snapshotResult.GetMeta(),
JobInfo: snapshotResult.GetJobInfo(),
CleanRestore: &cleanRestore,
}
setAuthInfo(req, spec)

// NOTE: ignore meta, because it's too large
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties)
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean restore: %v",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, cleanRestore)
if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil {
return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed")
} else {
Expand Down
225 changes: 225 additions & 0 deletions pkg/rpc/kitex_gen/backendservice/BackendService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a17ef30

Please sign in to comment.