Skip to content

Commit

Permalink
[feature](restore) Support clean_restore property for restore job
Browse files Browse the repository at this point in the history
The restore will keep the existing tables/partitions in which the
restore target is not contained, this PR adds a property clean_restore
to indicate that the restore job needs to recycle those
tables/partitions
  • Loading branch information
w41ter committed Aug 7, 2024
1 parent 3aadf92 commit 7b552a9
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@

public class RestoreStmt extends AbstractBackupStmt {
private static final String PROP_ALLOW_LOAD = "allow_load";
private static final String PROP_REPLICATION_NUM = "replication_num";
private static final String PROP_BACKUP_TIMESTAMP = "backup_timestamp";
private static final String PROP_META_VERSION = "meta_version";
private static final String PROP_RESERVE_REPLICA = "reserve_replica";
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;

public static final String PROP_RESERVE_REPLICA = "reserve_replica";
public static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
public static final String PROP_CLEAN_RESTORE = "clean_restore";

private boolean allowLoad = false;
private ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
private String backupTimestamp = null;
Expand All @@ -50,6 +51,7 @@ public class RestoreStmt extends AbstractBackupStmt {
private boolean reserveDynamicPartitionEnable = false;
private boolean isLocal = false;
private boolean isBeingSynced = false;
private boolean isCleanRestore = false;
private byte[] meta = null;
private byte[] jobInfo = null;

Expand Down Expand Up @@ -109,6 +111,10 @@ public boolean isBeingSynced() {
return isBeingSynced;
}

public boolean isCleanRestore() {
return isCleanRestore;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
Expand Down Expand Up @@ -223,6 +229,19 @@ public void analyzeProperties() throws AnalysisException {
copiedProperties.remove(PROP_IS_BEING_SYNCED);
}

// is clean restore
if (copiedProperties.containsKey(PROP_CLEAN_RESTORE)) {
if (copiedProperties.get(PROP_CLEAN_RESTORE).equalsIgnoreCase("true")) {
isCleanRestore = true;
} else if (copiedProperties.get(PROP_CLEAN_RESTORE).equalsIgnoreCase("false")) {
isCleanRestore = false;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Invalid clean_restore value: " + copiedProperties.get(PROP_CLEAN_RESTORE));
}
copiedProperties.remove(PROP_CLEAN_RESTORE);
}

if (!copiedProperties.isEmpty()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"Unknown restore job properties: " + copiedProperties.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), metaVersion, stmt.reserveReplica(),
stmt.reserveDynamicPartitionEnable(), stmt.isBeingSynced(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
stmt.isCleanRestore(), env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
stmt.isBeingSynced(), env, repository.getId());
stmt.isBeingSynced(), stmt.isCleanRestore(), env, repository.getId());
}

env.getEditLog().logRestoreJob(restoreJob);
Expand Down
83 changes: 77 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.backup;

import org.apache.doris.analysis.BackupStmt.BackupContent;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -110,9 +112,11 @@
import java.util.stream.Collectors;

public class RestoreJob extends AbstractJob implements GsonPostProcessable {
private static final String PROP_RESERVE_REPLICA = "reserve_replica";
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE = "reserve_dynamic_partition_enable";
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
private static final String PROP_RESERVE_DYNAMIC_PARTITION_ENABLE =
RestoreStmt.PROP_RESERVE_DYNAMIC_PARTITION_ENABLE;
private static final String PROP_IS_BEING_SYNCED = PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED;
private static final String PROP_CLEAN_RESTORE = RestoreStmt.PROP_CLEAN_RESTORE;

private static final Logger LOG = LogManager.getLogger(RestoreJob.class);

Expand Down Expand Up @@ -192,6 +196,9 @@ public enum RestoreJobState {

private boolean isBeingSynced = false;

// Whether to delete existing tables/partitions that are not involved in the restore.
private boolean isCleanRestore = false;

// restore properties
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();
Expand All @@ -202,7 +209,8 @@ public RestoreJob() {

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId) {
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanRestore,
Env env, long repoId) {
super(JobType.RESTORE, label, dbId, dbName, timeoutMs, env, repoId);
this.backupTimestamp = backupTs;
this.jobInfo = jobInfo;
Expand All @@ -217,16 +225,19 @@ public RestoreJob(String label, String backupTs, long dbId, String dbName, Backu
}
this.reserveDynamicPartitionEnable = reserveDynamicPartitionEnable;
this.isBeingSynced = isBeingSynced;
this.isCleanRestore = isCleanRestore;
properties.put(PROP_RESERVE_REPLICA, String.valueOf(reserveReplica));
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
properties.put(PROP_IS_BEING_SYNCED, String.valueOf(isBeingSynced));
properties.put(PROP_CLEAN_RESTORE, String.valueOf(isCleanRestore));
}

public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, Env env, long repoId, BackupMeta backupMeta) {
boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanRestore,
Env env, long repoId, BackupMeta backupMeta) {
this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
reserveDynamicPartitionEnable, isBeingSynced, env, repoId);
reserveDynamicPartitionEnable, isBeingSynced, isCleanRestore, env, repoId);
this.backupMeta = backupMeta;
}

Expand Down Expand Up @@ -894,7 +905,7 @@ private void checkAndPrepareMeta() {

if (ok) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replcias. {}", this);
LOG.debug("finished to create all restored replicas. {}", this);
}
// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
Expand Down Expand Up @@ -1833,6 +1844,14 @@ private Status allTabletCommitted(boolean isReplay) {
}
}

// Drop the exists but non-restored table/partitions.
if (isCleanRestore) {
Status st = dropAllNonRestoredTableAndPartitions(db);
if (!st.ok()) {
return st;
}
}

if (!isReplay) {
restoredPartitions.clear();
restoredTbls.clear();
Expand All @@ -1855,6 +1874,56 @@ private Status allTabletCommitted(boolean isReplay) {
return Status.OK;
}

private Status dropAllNonRestoredTableAndPartitions(Database db) {
try {
for (Table table : db.getTables()) {
long tableId = table.getId();
String tableName = table.getName();
TableType tableType = table.getType();
BackupOlapTableInfo backupTableInfo = jobInfo.backupOlapTableObjects.get(tableName);
if (tableType != TableType.OLAP && tableType != TableType.ODBC && tableType != TableType.VIEW) {
continue;
}
if (tableType == TableType.OLAP && backupTableInfo != null) {
// drop the non restored partitions.
dropNonRestoredPartitions(db, (OlapTable) table, backupTableInfo);
} else {
// otherwise drop the entire table.
LOG.info("drop non restored table {}({}). {}", tableName, tableId, this);
env.getInternalCatalog().dropTableWithoutCheck(db, table, true);
}
}
return Status.OK;
} catch (Exception e) {
LOG.warn("drop all non restored table and partitions failed. {}", this, e);
return new Status(ErrCode.COMMON_ERROR, e.getMessage());
}
}

private void dropNonRestoredPartitions(
Database db, OlapTable table, BackupOlapTableInfo backupTableInfo) throws DdlException {
if (!table.writeLockIfExist()) {
return;
}

try {
long tableId = table.getId();
String tableName = table.getQualifiedName();
InternalCatalog catalog = env.getInternalCatalog();
for (String partitionName : table.getPartitionNames()) {
if (backupTableInfo.containsPart(partitionName)) {
continue;
}

LOG.info("drop non restored partition {} of table {}({}). {}",
partitionName, tableName, tableId, this);
catalog.dropPartitionWithoutCheck(db, table, partitionName, false, true);
}
} finally {
table.writeUnlock();
}
}

private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
Expand Down Expand Up @@ -2192,13 +2261,15 @@ public void readFields(DataInput in) throws IOException {
reserveReplica = Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA));
reserveDynamicPartitionEnable = Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
isCleanRestore = Boolean.parseBoolean(properties.get(PROP_CLEAN_RESTORE));
}

@Override
public void gsonPostProcess() throws IOException {
reserveReplica = Boolean.parseBoolean(properties.get(PROP_RESERVE_REPLICA));
reserveDynamicPartitionEnable = Boolean.parseBoolean(properties.get(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE));
isBeingSynced = Boolean.parseBoolean(properties.get(PROP_IS_BEING_SYNCED));
isCleanRestore = Boolean.parseBoolean(properties.get(PROP_CLEAN_RESTORE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public void setState(PartitionState state) {
public void updateVersionForRestore(long visibleVersion) {
this.setVisibleVersion(visibleVersion);
this.nextVersion = this.visibleVersion + 1;
LOG.info("update partition {} version for restore: visible: {}, next: {}",
name, visibleVersion, nextVersion);
LOG.info("update partition {}({}) version for restore: visible: {}, next: {}",
name, id, visibleVersion, nextVersion);
}

public void updateVisibleVersion(long visibleVersion) {
Expand Down
Loading

0 comments on commit 7b552a9

Please sign in to comment.