Skip to content

Commit

Permalink
HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#…
Browse files Browse the repository at this point in the history
…5150)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
2005hithlj authored and Apache9 committed Apr 5, 2023
1 parent 851f186 commit f78fe59
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -182,19 +184,56 @@ private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName
}
}

private void writeInfoFile(FileSystem fs) throws IOException {
private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
// Record the info of this run. Currently only record the time we run the job. We will use this
// timestamp to clean up the data for last sequence ids and hfile refs in replication queue
// storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
ReplicationSyncUpToolInfo info =
new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
String json = JsonMapper.writeObjectAsString(info);
Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
out.write(Bytes.toBytes(json));
}
}

private static boolean parseOpts(String args[]) {
LinkedList<String> argv = new LinkedList<>();
argv.addAll(Arrays.asList(args));
String cmd = null;
while ((cmd = argv.poll()) != null) {
if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
printUsageAndExit(null, 0);
}
if (cmd.equals("-f")) {
return true;
}
if (!argv.isEmpty()) {
printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
}
}
return false;
}

private static void printUsageAndExit(final String message, final int exitCode) {
printUsage(message);
System.exit(exitCode);
}

private static void printUsage(final String message) {
if (message != null && message.length() > 0) {
System.err.println(message);
}
System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
System.err.println(" <OPTIONS> [-D<property=value>]*");
System.err.println();
System.err.println("General Options:");
System.err.println(" -h|--h|--help Show this help and exit.");
System.err
.println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
+ "See HBASE-27623 for details.");
}

@Override
public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() {
Expand All @@ -217,6 +256,7 @@ public boolean isAborted() {
return abort;
}
};
boolean isForce = parseOpts(args);
Configuration conf = getConf();
try (ZKWatcher zkw = new ZKWatcher(conf,
"syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
Expand All @@ -226,7 +266,7 @@ public boolean isAborted() {
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);

System.out.println("Start Replication Server");
writeInfoFile(fs);
writeInfoFile(fs, isForce);
Replication replication = new Replication();
// use offline table replication queue storage
getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -300,4 +302,38 @@ private void mimicSyncUpAfterPut() throws Exception {
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
rowCountHt2TargetAtPeer1);
}

/**
* test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
*/
@Test
public void testStartANewSyncUpToolAfterFailed() throws Exception {
// Start syncUpTool for the first time with non-force mode,
// let's assume that this will fail in sync data,
// this does not affect our test results
syncUp(UTIL1);
Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
FileSystem fs = UTIL1.getTestFileSystem();
assertTrue(fs.exists(replicationInfoPath));
FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);

// Start syncUpTool for the second time with non-force mode,
// startup will fail because replication info file already exists
try {
syncUp(UTIL1);
} catch (Exception e) {
assertTrue("e should be a FileAlreadyExistsException",
(e instanceof FileAlreadyExistsException));
}
FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());

// Start syncUpTool for the third time with force mode,
// startup will success and create a new replication info file
syncUp(UTIL1, new String[] { "-f" });
FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ final void setupReplication() throws Exception {
}

final void syncUp(HBaseTestingUtil util) throws Exception {
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
new String[0]);
syncUp(util, new String[0]);
}

final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
}

// Utilities that manager shutdown / restart of source / sink clusters. They take care of
Expand Down

0 comments on commit f78fe59

Please sign in to comment.