Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1678, fix delete failure during DFSIO (#1741)
Browse files Browse the repository at this point in the history
* Add file check for mergedelete in CopySchduler.
  • Loading branch information
qiyuangong authored May 31, 2018
1 parent 798f97d commit d79aca3
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
String srcDir = action.getArgs().get(SyncAction.SRC);
String path = action.getArgs().get(HdfsAction.FILE_PATH);
String destDir = action.getArgs().get(SyncAction.DEST);
String destPath = path.replace(srcDir, destDir);
// Check again to avoid corner cases
long did = fileDiffChainMap.get(path).getHead();
if (did == -1) {
Expand All @@ -155,7 +156,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
switch (fileDiff.getDiffType()) {
case APPEND:
action.setActionType("copy");
action.getArgs().put("-dest", path.replace(srcDir, destDir));
action.getArgs().put("-dest", destPath);
if (rateLimiter != null) {
String strLen = fileDiff.getParameters().get("-length");
if (strLen != null) {
Expand All @@ -173,19 +174,19 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
break;
case DELETE:
action.setActionType("delete");
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
break;
case RENAME:
action.setActionType("rename");
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
// TODO scope check
String remoteDest = fileDiff.getParameters().get("-dest");
action.getArgs().put("-dest", remoteDest.replace(srcDir, destDir));
fileDiff.getParameters().remove("-dest");
break;
case METADATA:
action.setActionType("metadata");
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
break;
default:
break;
Expand Down Expand Up @@ -620,6 +621,18 @@ private void unlockFile(long did){
fileLock.remove(diff.getSrc());
}

private boolean fileExistOnStandby(String filePath) {
// TODO Need to be more general to handle failure
try {
// Check if file exists at standby cluster
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
return fs.exists(new Path(filePath));
} catch (IOException e) {
LOG.debug("Fetch remote file status fails!", e);
return false;
}
}

private class ScheduleTask implements Runnable {

private void syncFileDiff() {
Expand Down Expand Up @@ -805,7 +818,7 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException {
for (long did : appendChain) {
FileDiff diff = fileDiffCache.get(did);
if (diff.getParameters().containsKey("-offset")) {
if (diff.getParameters().get("-offset").equals("0")) {
if (!isCreate && diff.getParameters().get("-offset").equals("0")) {
isCreate = true;
}
}
Expand All @@ -818,7 +831,13 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException {
// Delete raw is enough
fileDiffCacheChanged.put(fileDiff.getDiffId(), true);
}
diffChain.add(fileDiff.getDiffId());
if (fileExistOnStandby(filePath)) {
// Only allow delete when file do exist on remote
diffChain.add(fileDiff.getDiffId());
} else {
// Mark this delete diff as applied
updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.APPLIED);
}
} else {
updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.APPLIED);
}
Expand Down
133 changes: 94 additions & 39 deletions smart-server/src/test/java/org/smartdata/server/TestCopyScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.smartdata.server;

/*import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import org.smartdata.admin.SmartAdmin;
Expand All @@ -43,8 +43,8 @@ public class TestCopyScheduler extends MiniSmartClusterHarness {
public void appendMerge() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
CmdletManager cmdletManager = ssm.getCmdletManager();
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
// CmdletManager cmdletManager = ssm.getCmdletManager();
DistributedFileSystem dfs = cluster.getFileSystem();
final String srcPath = "/src/";
final String destPath = "/dest/";
Expand All @@ -55,7 +55,8 @@ public void appendMerge() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
for (int j = 0; j < 10; j++) {
DFSTestUtil.appendFile(dfs, new Path(srcPath + i), 1024);
}
Expand All @@ -71,8 +72,8 @@ public void appendMerge() throws Exception {
public void deleteMerge() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
CmdletManager cmdletManager = ssm.getCmdletManager();
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
// CmdletManager cmdletManager = ssm.getCmdletManager();
DistributedFileSystem dfs = cluster.getFileSystem();
final String srcPath = "/src/";
final String destPath = "/dest/";
Expand All @@ -83,7 +84,8 @@ public void deleteMerge() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
do {
Thread.sleep(500);
} while (!dfs.isFileClosed(new Path(srcPath + i)));
Expand All @@ -106,7 +108,7 @@ public void deleteMerge() throws Exception {
public void renameMerge() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
DistributedFileSystem dfs = cluster.getFileSystem();
final String srcPath = "/src/";
final String destPath = "/dest/";
Expand All @@ -117,8 +119,10 @@ public void renameMerge() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
dfs.rename(new Path(srcPath + i), new Path(srcPath + i + 10));
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
dfs.rename(new Path(srcPath + i),
new Path(srcPath + i + 10));
// Rename target ends with 10
DFSTestUtil.appendFile(dfs, new Path(srcPath + i + 10), 1024);
}
Expand All @@ -138,11 +142,12 @@ public void renameMerge() throws Exception {
public void failRetry() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
CmdletManager cmdletManager = ssm.getCmdletManager();
// CmdletManager cmdletManager = ssm.getCmdletManager();
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
long ruleId =
admin.submitRule(
"file: every 1s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 1s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
FileDiff fileDiff = new FileDiff(FileDiffType.RENAME, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
fileDiff.getParameters().put("-dest", "/src/2");
Expand Down Expand Up @@ -171,20 +176,23 @@ public void testForceSync() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
}
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(destPath + i + 5), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(destPath + i + 5),
1024, (short) 1, 1);
}
// Clear file diffs
metaStore.deleteAllFileDiff();
// Submit rules and trigger forceSync
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
Thread.sleep(1000);
Assert.assertTrue(metaStore.getFileDiffs(FileDiffState.PENDING).size() > 0);
}
Expand All @@ -200,14 +208,17 @@ public void batchSync() throws Exception {
FileInfo fileInfo;
long now = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
fileInfo = new FileInfo(srcPath + i, i, 1024, false, (short)3,
1024, now, now, (short) 1, null, null, (byte)3);
fileInfo = new FileInfo(srcPath + i, i,
1024, false, (short) 3,
1024, now, now, (short) 1,
null, null, (byte) 3);
metaStore.insertFile(fileInfo);
Thread.sleep(100);
}
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
Thread.sleep(2200);
do {
Thread.sleep(1000);
Expand All @@ -222,15 +233,17 @@ public void testDelete() throws Exception {
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
FileDiff fileDiff = new FileDiff(FileDiffType.DELETE, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
metaStore.insertFileDiff(fileDiff);
Thread.sleep(1200);
do {
Thread.sleep(1000);
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
Assert.assertTrue(cmdletManager.listNewCreatedActions("sync", 0).size() > 0);
Assert.assertTrue(cmdletManager
.listNewCreatedActions("sync", 0).size() > 0);
}
@Test(timeout = 60000)
Expand All @@ -241,7 +254,8 @@ public void testRename() throws Exception {
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
FileDiff fileDiff = new FileDiff(FileDiffType.RENAME, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
fileDiff.getParameters().put("-dest", "/src/2");
Expand All @@ -250,7 +264,8 @@ public void testRename() throws Exception {
do {
Thread.sleep(1000);
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
Assert.assertTrue(cmdletManager.listNewCreatedActions("sync", 0).size() > 0);
Assert.assertTrue(cmdletManager
.listNewCreatedActions("sync", 0).size() > 0);
}
@Test
Expand All @@ -266,10 +281,12 @@ public void testMeta() throws Exception {
dfs.mkdirs(new Path(destPath));
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
Thread.sleep(4200);
// Write to src
DFSTestUtil.createFile(dfs, new Path(srcPath + 1), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + 1),
1024, (short) 1, 1);
Thread.sleep(1000);
FileDiff fileDiff = new FileDiff(FileDiffType.METADATA, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
Expand Down Expand Up @@ -306,12 +323,14 @@ public void testCache() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
}
do {
Thread.sleep(1000);
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() <= 2);
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
List<ActionInfo> actionInfos = cmdletManager
.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
Thread.sleep(20000);
}
Expand Down Expand Up @@ -341,12 +360,14 @@ public void testWithSyncRule() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
}
do {
Thread.sleep(1000);
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() <= 2);
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
List<ActionInfo> actionInfos = cmdletManager
.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
do {
Thread.sleep(800);
Expand Down Expand Up @@ -376,7 +397,8 @@ public void testCopy() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
}
Thread.sleep(1000);
CmdletManager cmdletManager = ssm.getCmdletManager();
Expand All @@ -386,7 +408,8 @@ public void testCopy() throws Exception {
cmdletManager.submitCmdlet(
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
}
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
List<ActionInfo> actionInfos = cmdletManager
.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
do {
Thread.sleep(1000);
Expand All @@ -399,6 +422,45 @@ public void testCopy() throws Exception {
}
}
@Test(timeout = 40000)
public void testEmpyDelete() throws Exception {
// Delete files not exist on standby cluster
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
// metaStore.deleteAllFileDiff();
// metaStore.deleteAllFileInfo();
// metaStore.deleteAllCmdlets();
// metaStore.deleteAllActions();
DistributedFileSystem dfs = cluster.getFileSystem();
final String srcPath = "/src/";
final String destPath = "/dest/";
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
}
Thread.sleep(500);
BackUpInfo backUpInfo = new BackUpInfo(1L, srcPath, destPath, 100);
metaStore.insertBackUpInfo(backUpInfo);
dfs.mkdirs(new Path(srcPath));
dfs.mkdirs(new Path(destPath));
Thread.sleep(100);
for (int i = 0; i < 3; i++) {
// delete test files on primary cluster
dfs.delete(new Path(srcPath + i), false);
}
Thread.sleep(2000);
CmdletManager cmdletManager = ssm.getCmdletManager();
// Submit sync action
for (int i = 0; i < 3; i++) {
// Create test files
cmdletManager.submitCmdlet(
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
}
}
@Test(timeout = 40000)
public void testCopyDelete() throws Exception {
waitTillSSMExitSafeMode();
Expand All @@ -417,7 +479,8 @@ public void testCopyDelete() throws Exception {
// Write to src
for (int i = 0; i < 3; i++) {
// Create test files
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
1024, (short) 1, 1);
dfs.delete(new Path(srcPath + i), false);
}
Expand All @@ -429,13 +492,5 @@ public void testCopyDelete() throws Exception {
cmdletManager.submitCmdlet(
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
}
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
Thread.sleep(3000);
for (int i = 0; i < 3; i++) {
// Write 10 files
Assert.assertFalse(dfs.exists(new Path(destPath + i)));
System.out.printf("File %d is copied.\n", i);
}
}
}*/

0 comments on commit d79aca3

Please sign in to comment.