Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1442, fix and refactor CopyScheduler failover (#1826)
Browse files Browse the repository at this point in the history
* Fix failover in CopyScheduler.
* Refactor onActionFinished in CopyScheduler.
* Fix CopyScheduler UTs.
  • Loading branch information
qiyuangong authored Jun 13, 2018
1 parent 2747298 commit b3e84fa
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class CopyScheduler extends ActionSchedulerService {
// <File path, FileChain object>
private Map<String, ScheduleTask.FileChain> fileDiffChainMap;
// <did, Fail times>
private Map<Long, Integer> fileDiffMap;
private Map<Long, Integer> retryDiffMap;
// BaseSync queue
private Map<String, String> baseSyncQueue;
private Map<String, Boolean> overwriteQueue;
Expand Down Expand Up @@ -105,7 +105,7 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) {
this.fileLock = new ConcurrentHashMap<>();
this.actionDiffMap = new ConcurrentHashMap<>();
this.fileDiffChainMap = new ConcurrentHashMap<>();
this.fileDiffMap = new ConcurrentHashMap<>();
this.retryDiffMap = new ConcurrentHashMap<>();
this.baseSyncQueue = new ConcurrentHashMap<>();
this.overwriteQueue = new ConcurrentHashMap<>();
this.executorService = Executors.newScheduledThreadPool(2);
Expand Down Expand Up @@ -190,7 +190,6 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
// TODO scope check
String remoteDest = fileDiff.getParameters().get("-dest");
action.getArgs().put("-dest", remoteDest.replaceFirst(srcDir, destDir));
fileDiff.getParameters().remove("-dest");
break;
case METADATA:
action.setActionType("metadata");
Expand All @@ -202,7 +201,6 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
// Put all parameters into args
action.getArgs().putAll(fileDiff.getParameters());
actionDiffMap.put(actionInfo.getActionId(), did);
fileDiffMap.put(did, 0);
return ScheduleResult.SUCCESS;
}

Expand Down Expand Up @@ -255,53 +253,59 @@ public boolean onSubmit(ActionInfo actionInfo) throws IOException {
public void onActionFinished(ActionInfo actionInfo) {
// Remove lock
FileDiff fileDiff = null;
if (actionInfo.isFinished()) {
try {
long did = actionDiffMap.get(actionInfo.getActionId());
// Remove for action diff map
if (actionDiffMap.containsKey(actionInfo.getActionId())) {
actionDiffMap.remove(actionInfo.getActionId());
}
if (fileDiffCache.containsKey(did)) {
fileDiff = fileDiffCache.get(did);
long did = actionDiffMap.get(actionInfo.getActionId());
// Remove for action diff map
actionDiffMap.remove(actionInfo.getActionId());
fileDiff = fileDiffCache.get(did);
if (fileDiff == null) {
LOG.error("Duplicate sync action->[ {} ] is triggered", did);
return;
}
handleActionResult(actionInfo, fileDiff);
fileLock.remove(fileDiff.getSrc());
}


private void handleActionResult(ActionInfo actionInfo, FileDiff fileDiff) {
long did = fileDiff.getDiffId();
try {
if (actionInfo.isSuccessful()) {
// Remove from chain top
fileDiffChainMap.get(fileDiff.getSrc()).removeHead();
// Update state in cache
updateFileDiffInCache(did, FileDiffState.APPLIED);
// Remove from retry map
retryDiffMap.remove(did);
} else {
// Action failed
if (!retryDiffMap.containsKey(did)) {
// Put diff into retry map
retryDiffMap.put(did, 0);
} else {
LOG.error("Duplicate sync action->[ {} ] is triggered", did);
return;
}
if (fileDiff == null) {
return;
}
if (actionInfo.isSuccessful()) {
if (fileDiffChainMap.containsKey(fileDiff.getSrc())) {
// Remove from chain top
int curr = retryDiffMap.get(did);
if (curr >= retryTh) {
// Action failed several times and exceed threshold
fileDiffChainMap.get(fileDiff.getSrc()).removeHead();
}
//update state in cache
updateFileDiffInCache(did, FileDiffState.APPLIED);
if (fileDiffMap.containsKey(did)) {
fileDiffMap.remove(did);
}
} else {
if (fileDiffMap.containsKey(did)) {
int curr = fileDiffMap.get(did);
if (curr >= retryTh) {
//update state in cache
updateFileDiffInCache(did, FileDiffState.FAILED);
// directSync(fileDiff.getSrc(),
// actionInfo.getArgs().get(SyncAction.SRC),
// actionInfo.getArgs().get(SyncAction.DEST));
} else {
fileDiffMap.put(did, curr + 1);
// Unlock this file for retry
fileLock.remove(fileDiff.getSrc());
}
// Mark diff as failed
updateFileDiffInCache(did, FileDiffState.FAILED);
// Trigger direct sync for this file
// Todo maybe diff affects multiple files
retryDiffMap.remove(did);
// Add to direct Sync queue
baseSyncQueue.put(actionInfo.getArgs().get(SyncAction.SRC),
actionInfo.getArgs().get(SyncAction.DEST));
} else {
retryDiffMap.put(did, curr + 1);
}
}
} catch (MetaStoreException e) {
LOG.error("Mark sync action in metastore failed!", e);
} catch (Exception e) {
LOG.error("Sync action error", e);
}
// Unlock this file for retry and next diff
} catch (MetaStoreException e) {
retryDiffMap.remove(did);
LOG.error("Mark sync action in metastore failed!", e);
} catch (Exception e) {
retryDiffMap.remove(did);
LOG.error("ActionOnFinish: Sync action error", e);
}
}

Expand All @@ -316,7 +320,7 @@ private void batchDirectSync() throws MetaStoreException {
FileDiff fileDiff;
int index = 0;
for (Iterator<Map.Entry<String, String>> it =
baseSyncQueue.entrySet().iterator(); it.hasNext(); ) {
baseSyncQueue.entrySet().iterator(); it.hasNext();) {
if (index >= batchSize) {
break;
}
Expand Down Expand Up @@ -410,6 +414,7 @@ private void baseSync(String srcDir,
}

private FileDiff directSync(String src, String dest) throws MetaStoreException {
// Check if file is in file table
FileInfo fileInfo = metaStore.getFile(src);
if (fileInfo == null) {
// Primary file doesn't exist
Expand Down Expand Up @@ -535,9 +540,7 @@ private void deleteDiffInCache(Long did) {
fileDiffCache.remove(did);
fileDiffCacheChanged.remove(did);
// Remove file lock
if (fileLock.containsKey(fileDiff.getSrc())) {
fileLock.remove(fileDiff.getSrc());
}
fileLock.remove(fileDiff.getSrc());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ public void testUpdate() {
Assert.assertTrue(fileDiffDao.getPendingDiff().size() == 1);
fileDiffs[0].getParameters().put("-offset", "0");
fileDiffs[0].setSrc("test1");
fileDiffs[0].setState(FileDiffState.FAILED);
fileDiffs[1].setCreateTime(2);
fileDiffs[1].setRuleId(2);
fileDiffs[1].setDiffType(FileDiffType.RENAME);
fileDiffs[1].setState(FileDiffState.APPLIED);
fileDiffDao.update(fileDiffs);
Assert.assertTrue(fileDiffDao.getById(1).equals(fileDiffs[0]));
Assert.assertTrue(fileDiffDao.getById(2).equals(fileDiffs[1]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,16 @@ public void testDelete() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
CmdletManager cmdletManager = ssm.getCmdletManager();
DFSTestUtil.createFile(dfs, new Path("/dest/1"),
1024, (short) 1, 1);
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
FileDiff fileDiff = new FileDiff(FileDiffType.DELETE, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
fileDiff.getParameters().put("-dest", "/dest/1");
metaStore.insertFileDiff(fileDiff);
Thread.sleep(1200);
do {
Expand Down Expand Up @@ -266,6 +269,7 @@ public void testRename() throws Exception {
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
Assert.assertTrue(cmdletManager
.listNewCreatedActions("sync", 0).size() > 0);
Assert.assertFalse(dfs.exists(new Path("/dest/1")));
}
@Test
Expand Down Expand Up @@ -408,6 +412,7 @@ public void testCopy() throws Exception {
cmdletManager.submitCmdlet(
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
}
Thread.sleep(1000);
List<ActionInfo> actionInfos = cmdletManager
.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
Expand Down

0 comments on commit b3e84fa

Please sign in to comment.