Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Solve #1442, fix and refactor CopyScheduler failover #1826

Merged
merged 4 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class CopyScheduler extends ActionSchedulerService {
// <File path, FileChain object>
private Map<String, ScheduleTask.FileChain> fileDiffChainMap;
// <did, Fail times>
private Map<Long, Integer> fileDiffMap;
private Map<Long, Integer> retryDiffMap;
// BaseSync queue
private Map<String, String> baseSyncQueue;
private Map<String, Boolean> overwriteQueue;
Expand Down Expand Up @@ -102,7 +102,7 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) {
this.fileLock = new ConcurrentHashMap<>();
this.actionDiffMap = new ConcurrentHashMap<>();
this.fileDiffChainMap = new ConcurrentHashMap<>();
this.fileDiffMap = new ConcurrentHashMap<>();
this.retryDiffMap = new ConcurrentHashMap<>();
this.baseSyncQueue = new ConcurrentHashMap<>();
this.overwriteQueue = new ConcurrentHashMap<>();
this.executorService = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -182,7 +182,6 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
// TODO scope check
String remoteDest = fileDiff.getParameters().get("-dest");
action.getArgs().put("-dest", remoteDest.replaceFirst(srcDir, destDir));
fileDiff.getParameters().remove("-dest");
break;
case METADATA:
action.setActionType("metadata");
Expand All @@ -194,7 +193,6 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
// Put all parameters into args
action.getArgs().putAll(fileDiff.getParameters());
actionDiffMap.put(actionInfo.getActionId(), did);
fileDiffMap.put(did, 0);
return ScheduleResult.SUCCESS;
}

Expand Down Expand Up @@ -247,53 +245,59 @@ public boolean onSubmit(ActionInfo actionInfo) throws IOException {
public void onActionFinished(ActionInfo actionInfo) {
// Remove lock
FileDiff fileDiff = null;
if (actionInfo.isFinished()) {
try {
long did = actionDiffMap.get(actionInfo.getActionId());
// Remove for action diff map
if (actionDiffMap.containsKey(actionInfo.getActionId())) {
actionDiffMap.remove(actionInfo.getActionId());
}
if (fileDiffCache.containsKey(did)) {
fileDiff = fileDiffCache.get(did);
long did = actionDiffMap.get(actionInfo.getActionId());
// Remove for action diff map
actionDiffMap.remove(actionInfo.getActionId());
fileDiff = fileDiffCache.get(did);
if (fileDiff == null) {
LOG.error("Duplicate sync action->[ {} ] is triggered", did);
return;
}
handleActionResult(actionInfo, fileDiff);
fileLock.remove(fileDiff.getSrc());
}


private void handleActionResult(ActionInfo actionInfo, FileDiff fileDiff) {
long did = fileDiff.getDiffId();
try {
if (actionInfo.isSuccessful()) {
// Remove from chain top
fileDiffChainMap.get(fileDiff.getSrc()).removeHead();
// Update state in cache
updateFileDiffInCache(did, FileDiffState.APPLIED);
// Remove from retry map
retryDiffMap.remove(did);
} else {
// Action failed
if (!retryDiffMap.containsKey(did)) {
// Put diff into retry map
retryDiffMap.put(did, 0);
} else {
LOG.error("Duplicate sync action->[ {} ] is triggered", did);
return;
}
if (fileDiff == null) {
return;
}
if (actionInfo.isSuccessful()) {
if (fileDiffChainMap.containsKey(fileDiff.getSrc())) {
// Remove from chain top
int curr = retryDiffMap.get(did);
if (curr >= retryTh) {
// Action failed several times and exceed threshold
fileDiffChainMap.get(fileDiff.getSrc()).removeHead();
}
//update state in cache
updateFileDiffInCache(did, FileDiffState.APPLIED);
if (fileDiffMap.containsKey(did)) {
fileDiffMap.remove(did);
}
} else {
if (fileDiffMap.containsKey(did)) {
int curr = fileDiffMap.get(did);
if (curr >= retryTh) {
//update state in cache
updateFileDiffInCache(did, FileDiffState.FAILED);
// directSync(fileDiff.getSrc(),
// actionInfo.getArgs().get(SyncAction.SRC),
// actionInfo.getArgs().get(SyncAction.DEST));
} else {
fileDiffMap.put(did, curr + 1);
// Unlock this file for retry
fileLock.remove(fileDiff.getSrc());
}
// Mark diff as failed
updateFileDiffInCache(did, FileDiffState.FAILED);
// Trigger direct sync for this file
// Todo maybe diff affects multiple files
retryDiffMap.remove(did);
// Add to direct Sync queue
baseSyncQueue.put(actionInfo.getArgs().get(SyncAction.SRC),
actionInfo.getArgs().get(SyncAction.DEST));
} else {
retryDiffMap.put(did, curr + 1);
}
}
} catch (MetaStoreException e) {
LOG.error("Mark sync action in metastore failed!", e);
} catch (Exception e) {
LOG.error("Sync action error", e);
}
// Unlock this file for retry and next diff
} catch (MetaStoreException e) {
retryDiffMap.remove(did);
LOG.error("Mark sync action in metastore failed!", e);
} catch (Exception e) {
retryDiffMap.remove(did);
LOG.error("ActionOnFinish: Sync action error", e);
}
}

Expand All @@ -308,7 +312,7 @@ private void batchDirectSync() throws MetaStoreException {
FileDiff fileDiff;
int index = 0;
for (Iterator<Map.Entry<String, String>> it =
baseSyncQueue.entrySet().iterator(); it.hasNext(); ) {
baseSyncQueue.entrySet().iterator(); it.hasNext();) {
if (index >= batchSize) {
break;
}
Expand Down Expand Up @@ -402,6 +406,7 @@ private void baseSync(String srcDir,
}

private FileDiff directSync(String src, String dest) throws MetaStoreException {
// Check if file is in file table
FileInfo fileInfo = metaStore.getFile(src);
if (fileInfo == null) {
// Primary file doesn't exist
Expand Down Expand Up @@ -524,9 +529,7 @@ private void deleteDiffInCache(Long did) {
fileDiffCache.remove(did);
fileDiffCacheChanged.remove(did);
// Remove file lock
if (fileLock.containsKey(fileDiff.getSrc())) {
fileLock.remove(fileDiff.getSrc());
}
fileLock.remove(fileDiff.getSrc());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ public void testUpdate() {
Assert.assertTrue(fileDiffDao.getPendingDiff().size() == 1);
fileDiffs[0].getParameters().put("-offset", "0");
fileDiffs[0].setSrc("test1");
fileDiffs[0].setState(FileDiffState.FAILED);
fileDiffs[1].setCreateTime(2);
fileDiffs[1].setRuleId(2);
fileDiffs[1].setDiffType(FileDiffType.RENAME);
fileDiffs[1].setState(FileDiffState.APPLIED);
fileDiffDao.update(fileDiffs);
Assert.assertTrue(fileDiffDao.getById(1).equals(fileDiffs[0]));
Assert.assertTrue(fileDiffDao.getById(2).equals(fileDiffs[1]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,16 @@ public void testDelete() throws Exception {
waitTillSSMExitSafeMode();
MetaStore metaStore = ssm.getMetaStore();
CmdletManager cmdletManager = ssm.getCmdletManager();
DFSTestUtil.createFile(dfs, new Path("/dest/1"),
1024, (short) 1, 1);
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
long ruleId =
admin.submitRule(
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
RuleState.ACTIVE);
FileDiff fileDiff = new FileDiff(FileDiffType.DELETE, FileDiffState.PENDING);
fileDiff.setSrc("/src/1");
fileDiff.getParameters().put("-dest", "/dest/1");
metaStore.insertFileDiff(fileDiff);
Thread.sleep(1200);
do {
Expand Down Expand Up @@ -266,6 +269,7 @@ public void testRename() throws Exception {
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
Assert.assertTrue(cmdletManager
.listNewCreatedActions("sync", 0).size() > 0);
Assert.assertFalse(dfs.exists(new Path("/dest/1")));
}

@Test
Expand Down Expand Up @@ -408,6 +412,7 @@ public void testCopy() throws Exception {
cmdletManager.submitCmdlet(
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
}
Thread.sleep(1000);
List<ActionInfo> actionInfos = cmdletManager
.listNewCreatedActions("sync", 0);
Assert.assertTrue(actionInfos.size() >= 3);
Expand Down