Skip to content

Commit

Permalink
Add support to take parallel backups
Browse files Browse the repository at this point in the history
  • Loading branch information
rda3mon committed Jun 17, 2022
1 parent 5e5d18b commit 7b16091
Show file tree
Hide file tree
Showing 18 changed files with 545 additions and 220 deletions.
74 changes: 74 additions & 0 deletions dev-support/design-docs/parallel-backup/existing_design.uml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
@startuml
/'
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'/
start
: (1.1) Create Backup;
if ( hasActiveSession or inInconsistentState?) then (yes)
: Fail;
stop
else (no)
: (1.2.1) Create Backup Request and execute;
: (1.2.2) Create dirs in destination;
: (1.2.3) Create Backup Client and execute;
: (1.2.4) Create exclusive backup session;
: (1.2.5) Take Backup Table Snapshot;
: (1.2.6) Set State to RUNNING and phase to REQUEST;
if ( full backup?) then (yes)
: (1.2.7.1.1) Read last backup start time or 0L;
: (1.2.7.1.2) Perform LogRoll Procedure;
: (1.2.7.1.3) Record WAL older than LogRoll to system table;
: (1.2.7.1.4) Set Phase to SNAPSHOT;
: (1.2.7.1.5) Take Snapshot of every table;
: (1.2.7.1.6) Export Snapshot to dest dir;
: (1.2.7.1.7) Write start time for next backup to system table;
: (1.2.7.1.8) Add Manifest;
: (1.2.7.1.9) Delete Snapshots;
: (1.2.7.1.10) Cleanup Export snapshot log;
else (no)
: (1.2.7.2.1) Set phase to PREPARE_INCREMENTAL;
if (fail to get log file map?) then (yes)
: Fail;
stop
else (no)
: (1.2.7.2.2) Copy table and region info;
: (1.2.7.2.3) MR to convert WAL into HFiles;
: (1.2.7.2.4) Copy HFiles into dest with DistCP
: (1.2.7.2.5) Record WAL older than what is copied;
: (1.2.7.2.6) Write start time for next backup to system table;
: (1.2.7.2.7) Add Manifest;
: (1.2.7.2.8) Cleanup DistCp log;
endif
endif
: (1.2.8) Delete System Table Snapshot;
: (1.2.9) Update BackupInfo with Status Complete;
: (1.2.10) Clear exclusive backup sesion;
endif
stop

start
: Fail;
: (2.1) Set State to FAILED;
if (full backup?) then (yes)
: (2.2.1.1) Delete all snapshots;
: (2.2.1.2)Cleanup export snapshot log;
endif
: (2.3) Restore backup system table from snapshot;
: (2.4) Delete backup system table snapshot;
: (2.5) Cleanup Target Dir;
stop
@enduml
76 changes: 76 additions & 0 deletions dev-support/design-docs/parallel-backup/proposed_design.uml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
@startuml
/'
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'/

start
: (1.1) Create Backup;
: (1.2.1) Create Backup Request and execute;
: (1.2.2) Create dirs in destination;
: (1.2.3) Create Backup Client and execute;
#Orange: (1.2.4) Create exclusive backup session;
note right: Table exclusive lock
#Red: (1.2.5) Take Backup Table Snapshot;
note right: To be removed
: (1.2.6) Set State to RUNNING and phase to REQUEST;
if ( full backup?) then (yes)
: (1.2.7.1.1) Read last backup start time or 0L;
: (1.2.7.1.2) Perform LogRoll Procedure;
: (1.2.7.1.3) Record WAL older than LogRoll to system table;
: (1.2.7.1.4) Set Phase to SNAPSHOT;
: (1.2.7.1.5) Take Snapshot of every table;
: (1.2.7.1.6) Export Snapshot to dest dir;
: (1.2.7.1.7) Write start time for next backup to system table;
: (1.2.7.1.8) Add Manifest;
: (1.2.7.1.9) Delete Snapshots;
: (1.2.7.1.10) Cleanup Export snapshot log;
else (no)
: (1.2.7.2.1) Set phase to PREPARE_INCREMENTAL;
if (fail to get log file map?) then (yes)
: Fail;
stop
else (no)
: (1.2.7.2.2) Copy table and region info;
: (1.2.7.2.3) MR to convert WAL into HFiles;
: (1.2.7.2.4) Copy HFiles into dest with DistCP
: (1.2.7.2.5) Record WAL older than what is copied;
: (1.2.7.2.6) Write start time for next backup to system table;
: (1.2.7.2.7) Add Manifest;
: (1.2.7.2.8) Cleanup DistCp log;
endif
endif
#RED: (1.2.8) Delete System Table Snapshot;
note right: To be removed
: (1.2.9) Update BackupInfo with Status Complete;
#Orange: (1.2.10) Clear exclusive backup sesion;
note right: Clear table exclusive lock
stop

start
: Fail;
: (2.1) Set State to FAILED;
if (full backup?) then (yes)
: (2.2.1.1) Delete all snapshots;
: (2.2.1.2) Cleanup export snapshot log;
endif
#RED: (2.3) Restore backup system table from snapshot;
note right: To be removed
#RED: (2.4) Delete backup system table snapshot;
note right: To be removed
: (2.5) Cleanup Target Dir;
stop
@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ public interface BackupRestoreConstants {

String CONF_STAGING_ROOT = "snapshot.export.staging.root";

String BACKUPID_PREFIX = "backup_";
String BACKUPID_PREFIX = "backup";

String UNDERSCORE = "_";

static String getBackupPrefix() {
return BackupRestoreConstants.BACKUPID_PREFIX + BackupRestoreConstants.UNDERSCORE;
}

enum BackupCommand {
CREATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -94,15 +95,18 @@ public int deleteBackups(String[] backupIds) throws IOException {

int totalDeleted = 0;
Map<String, HashSet<TableName>> allTablesMap = new HashMap<>();
List<BackupInfo> backupInfos = new ArrayList<>();
for (String backupId : backupIds) {
backupInfos.add(getBackupInfo(backupId));
}

boolean deleteSessionStarted;
boolean snapshotDone;
try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
// Step 1: Make sure there is no active session
// is running by using startBackupSession API
// If there is an active session in progress, exception will be thrown
try {
sysTable.startBackupExclusiveOperation();
sysTable.startBackupExclusiveOperation(backupInfos);
deleteSessionStarted = true;
} catch (IOException e) {
LOG.warn("You can not run delete command while active backup session is in progress. \n"
Expand All @@ -121,13 +125,6 @@ public int deleteBackups(String[] backupIds) throws IOException {

// Step 3: Record delete session
sysTable.startDeleteOperation(backupIds);
// Step 4: Snapshot backup system table
if (!BackupSystemTable.snapshotExists(conn)) {
BackupSystemTable.snapshot(conn);
} else {
LOG.warn("Backup system table snapshot exists");
}
snapshotDone = true;
try {
for (int i = 0; i < backupIds.length; i++) {
BackupInfo info = sysTable.readBackupInfo(backupIds[i]);
Expand All @@ -145,28 +142,11 @@ public int deleteBackups(String[] backupIds) throws IOException {
finalizeDelete(allTablesMap, sysTable);
// Finish
sysTable.finishDeleteOperation();
// delete snapshot
BackupSystemTable.deleteSnapshot(conn);
} catch (IOException e) {
// Fail delete operation
// Step 1
if (snapshotDone) {
if (BackupSystemTable.snapshotExists(conn)) {
BackupSystemTable.restoreFromSnapshot(conn);
// delete snapshot
BackupSystemTable.deleteSnapshot(conn);
// We still have record with unfinished delete operation
LOG.error("Delete operation failed, please run backup repair utility to restore "
+ "backup system integrity", e);
throw e;
} else {
LOG.warn("Delete operation succeeded, there were some errors: ", e);
}
}

LOG.warn("Delete operation succeeded, there were some errors: ", e);
} finally {
if (deleteSessionStarted) {
sysTable.finishBackupExclusiveOperation();
sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds));
}
}
}
Expand Down Expand Up @@ -524,7 +504,8 @@ public String backupTables(BackupRequest request) throws IOException {
String targetRootDir = request.getTargetRootDir();
List<TableName> tableList = request.getTableList();

String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
String backupId =
BackupRestoreConstants.getBackupPrefix() + EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
Set<TableName> incrTableSet;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -257,7 +258,7 @@ public static class CreateCommand extends Command {

@Override
protected boolean requiresNoActiveSession() {
return true;
return false;
}

@Override
Expand Down Expand Up @@ -334,12 +335,12 @@ public void execute() throws IOException {
System.setProperty("mapreduce.job.queuename", queueName);
}

List<TableName> tablesList = Lists.newArrayList(BackupUtils.parseTableNames(tables));

try (BackupAdminImpl admin = new BackupAdminImpl(conn)) {
BackupRequest.Builder builder = new BackupRequest.Builder();
BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
.withTableList(
tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withTableList(tablesList).withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
String backupId = admin.backupTables(request);
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
Expand Down Expand Up @@ -671,35 +672,35 @@ public void execute() throws IOException {
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
// Failed backup
BackupInfo backupInfo;
List<BackupInfo> list = sysTable.getBackupInfos(BackupState.RUNNING);
if (list.size() == 0) {
List<BackupInfo> backupInfos = sysTable.getBackupInfos(BackupState.RUNNING);
if (backupInfos.size() == 0) {
// No failed sessions found
System.out.println("REPAIR status: no failed sessions found."
+ " Checking failed delete backup operation ...");
repairFailedBackupDeletionIfAny(conn, sysTable);
repairFailedBackupMergeIfAny(conn, sysTable);
return;
}
backupInfo = list.get(0);
// If this is a cancel exception, then we've already cleaned.
// set the failure timestamp of the overall backup
backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
// set failure message
backupInfo.setFailedMsg("REPAIR status: repaired after failure:\n" + backupInfo);
// set overall backup status: failed
backupInfo.setState(BackupState.FAILED);
// compose the backup failed data
String backupFailedData = "BackupId=" + backupInfo.getBackupId() + ",startts="
+ backupInfo.getStartTs() + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase="
+ backupInfo.getPhase() + ",failedmessage=" + backupInfo.getFailedMsg();
System.out.println(backupFailedData);
TableBackupClient.cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
// If backup session is updated to FAILED state - means we
// processed recovery already.
sysTable.updateBackupInfo(backupInfo);
sysTable.finishBackupExclusiveOperation();
System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo);
for (BackupInfo backupInfo : backupInfos) {
// If this is a cancel exception, then we've already cleaned.
// set the failure timestamp of the overall backup
backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
// set failure message
backupInfo.setFailedMsg("REPAIR status: repaired after failure:\n" + backupInfo);
// set overall backup status: failed
backupInfo.setState(BackupState.FAILED);
// compose the backup failed data
String backupFailedData = "BackupId=" + backupInfo.getBackupId() + ",startts="
+ backupInfo.getStartTs() + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase="
+ backupInfo.getPhase() + ",failedmessage=" + backupInfo.getFailedMsg();
System.out.println(backupFailedData);
TableBackupClient.cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
// If backup session is updated to FAILED state - means we
// processed recovery already.
sysTable.updateBackupInfo(backupInfo);
sysTable.finishBackupExclusiveOperation(Arrays.asList(backupInfo.getBackupId()));
System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo);
}
}
}

Expand All @@ -708,16 +709,12 @@ private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable
String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation();
if (backupIds == null || backupIds.length == 0) {
System.out.println("No failed backup DELETE operation found");
// Delete backup table snapshot if exists
BackupSystemTable.deleteSnapshot(conn);
return;
}
System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds));
System.out.println("Running DELETE again ...");
// Restore table from snapshot
BackupSystemTable.restoreFromSnapshot(conn);
// Finish previous failed session
sysTable.finishBackupExclusiveOperation();
sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds));
try (BackupAdmin admin = new BackupAdminImpl(conn)) {
admin.deleteBackups(backupIds);
}
Expand All @@ -730,8 +727,6 @@ public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTab
String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
if (backupIds == null || backupIds.length == 0) {
System.out.println("No failed backup MERGE operation found");
// Delete backup table snapshot if exists
BackupSystemTable.deleteSnapshot(conn);
return;
}
System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
Expand All @@ -757,10 +752,8 @@ public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTab
} else {
checkRemoveBackupImages(fs, backupRoot, backupIds);
}
// Restore table from snapshot
BackupSystemTable.restoreFromSnapshot(conn);
// Unlock backup system
sysTable.finishBackupExclusiveOperation();
sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds));
// Finish previous failed session
sysTable.finishMergeOperation();

Expand Down
Loading

0 comments on commit 7b16091

Please sign in to comment.