Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1838, Add search support for actions (#1842)
Browse files Browse the repository at this point in the history
  • Loading branch information
rum2mojito authored and littlezhou committed Jul 18, 2018
1 parent 7232f36 commit 628718e
Show file tree
Hide file tree
Showing 6 changed files with 549 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public class CmdletManager extends AbstractService {
public static final int TIMEOUT_MULTIPLIER = 100;
public static final int TIMEOUT_MIN_MILLISECOND = 30000;
public static final String TIMEOUTLOG =
"Timeout error occurred for getting this action's status report.";
"Timeout error occurred for getting this action's status report.";
public static final String ACTION_SKIP_LOG =
"The action is not executed because the prior action in the same cmdlet failed.";
"The action is not executed because the prior action in the same cmdlet failed.";

private ScheduledExecutorService executorService;
private CmdletDispatcher dispatcher;
Expand Down Expand Up @@ -113,8 +113,12 @@ public class CmdletManager extends AbstractService {

private long totalScheduled = 0;

private ActionGroup tmpActions = new ActionGroup();

private long timeout;

private ActionGroup cache;

public CmdletManager(ServerContext context) throws IOException {
super(context);

Expand All @@ -130,12 +134,12 @@ public CmdletManager(ServerContext context) throws IOException {
this.cacheCmd = new ConcurrentHashMap<>();
this.fileLocks = new ConcurrentHashMap<>();
this.dispatcher = new CmdletDispatcher(context, this, scheduledCmdlet,
idToLaunchCmdlet, runningCmdlets, schedulers);
idToLaunchCmdlet, runningCmdlets, schedulers);
maxNumPendingCmdlets = context.getConf()
.getInt(SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_KEY,
.getInt(SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_KEY,
SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_DEFAULT);
cacheCmdTh = context.getConf()
.getInt(SmartConfKeys.SMART_CMDLET_CACHE_BATCH,
.getInt(SmartConfKeys.SMART_CMDLET_CACHE_BATCH,
SmartConfKeys.SMART_CMDLET_CACHE_BATCH_DEFAULT);

int reportPeriod = context.getConf().getInt(SmartConfKeys.SMART_STATUS_REPORT_PERIOD_KEY,
Expand Down Expand Up @@ -166,7 +170,7 @@ public void init() throws IOException {
numCmdletsFinished.addAndGet(metaStore.getNumCmdletsInTerminiatedStates());

schedulerServices = AbstractServiceFactory.createActionSchedulerServices(
getContext().getConf(), getContext(), metaStore, false);
getContext().getConf(), getContext(), metaStore, false);

for (ActionSchedulerService s : schedulerServices) {
s.init();
Expand Down Expand Up @@ -227,14 +231,14 @@ private void reloadCmdletsInDB() throws IOException{
* @throws IOException
*/
private void checkActionNames(
CmdletDescriptor cmdletDescriptor) throws IOException {
CmdletDescriptor cmdletDescriptor) throws IOException {
for (int index = 0; index < cmdletDescriptor.getActionSize(); index++) {
if (!ActionRegistry
.registeredAction(cmdletDescriptor.getActionName(index))) {
.registeredAction(cmdletDescriptor.getActionName(index))) {
throw new IOException(
String.format(
"Submit Cmdlet %s error! Action names are not correct!",
cmdletDescriptor));
String.format(
"Submit Cmdlet %s error! Action names are not correct!",
cmdletDescriptor));
}
}
}
Expand All @@ -251,7 +255,7 @@ private void checkActionsOnSubmit(CmdletInfo cmdletInfo,
for (ActionScheduler p : schedulers.get(actionInfo.getActionName())) {
if (!p.onSubmit(actionInfo)) {
throw new IOException(
String.format("Action rejected by scheduler", actionInfo));
String.format("Action rejected by scheduler", actionInfo));
}
}
cmdletInfo.addAction(actionInfo.getActionId());
Expand All @@ -262,10 +266,10 @@ private void checkActionsOnSubmit(CmdletInfo cmdletInfo,
public void start() throws IOException {
LOG.info("Starting ...");
executorService.scheduleAtFixedRate(new CmdletPurgeTask(getContext().getConf()),
10, 5000, TimeUnit.MILLISECONDS);
10, 5000, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(new ScheduleTask(), 100, 50, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(new DetectFailedActionTask(), 1000, 5000,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS);

for (ActionSchedulerService s : schedulerServices) {
s.start();
Expand Down Expand Up @@ -312,14 +316,14 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException {
}
long submitTime = System.currentTimeMillis();
CmdletInfo cmdletInfo =
new CmdletInfo(
maxCmdletId.getAndIncrement(),
cmdletDescriptor.getRuleId(),
CmdletState.PENDING,
cmdletDescriptor.getCmdletString(),
submitTime,
submitTime,
submitTime + cmdletDescriptor.getDeferIntervalMs());
new CmdletInfo(
maxCmdletId.getAndIncrement(),
cmdletDescriptor.getRuleId(),
CmdletState.PENDING,
cmdletDescriptor.getCmdletString(),
submitTime,
submitTime,
submitTime + cmdletDescriptor.getDeferIntervalMs());
List<ActionInfo> actionInfos = createActionInfos(cmdletDescriptor, cmdletInfo.getCid());
// Check action names
checkActionNames(cmdletDescriptor);
Expand All @@ -338,7 +342,7 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException {
* @throws IOException
*/
private void syncCmdAction(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) throws IOException {
List<ActionInfo> actionInfos) throws IOException {
lockMovefileActionFiles(actionInfos);
LOG.debug("Cache cmd {}", cmdletInfo);
for (ActionInfo actionInfo : actionInfos) {
Expand Down Expand Up @@ -400,25 +404,25 @@ private void batchSyncCmdAction() {
LOG.debug("Number of cmds {} to submit", cmdletInfos.size());
try {
metaStore.insertActions(
actionInfos.toArray(new ActionInfo[actionInfos.size()]));
actionInfos.toArray(new ActionInfo[actionInfos.size()]));
metaStore.insertCmdlets(
cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()]));
cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()]));
} catch (MetaStoreException e) {
LOG.error("{} submit to DB error", cmdletInfos, e);
}
}
}

private synchronized Set<String> lockMovefileActionFiles(List<ActionInfo> actionInfos)
throws IOException {
throws IOException {
Map<String, Long> filesToLock = new HashMap<>();
for (ActionInfo info : actionInfos) {
SmartAction action;
try {
action = ActionRegistry.createAction(info.getActionName());
} catch (ActionException e) {
throw new IOException("Failed to create '" + info.getActionName()
+ "' action instance", e);
+ "' action instance", e);
}
if (action instanceof AbstractMoveFileAction) {
Map<String, String> args = info.getArgs();
Expand Down Expand Up @@ -497,7 +501,6 @@ private int scheduleCmdlet() throws IOException {
} else {
continue;
}

try {
if (result == ScheduleResult.SUCCESS) {
idToLaunchCmdlet.put(cmdlet.getCid(), launchCmdlet);
Expand All @@ -523,7 +526,8 @@ private int scheduleCmdlet() throws IOException {
return nScheduled;
}

private ScheduleResult scheduleCmdletActions(CmdletInfo info, LaunchCmdlet launchCmdlet) {
private ScheduleResult scheduleCmdletActions(CmdletInfo info,
LaunchCmdlet launchCmdlet) {
List<Long> actIds = info.getAids();
int idx = 0;
int schIdx = 0;
Expand Down Expand Up @@ -604,7 +608,7 @@ private LaunchCmdlet createLaunchCmdlet(CmdletInfo cmdletInfo) {
args = new HashMap<>();
args.putAll(toLaunch.getArgs());
launchActions.add(
new LaunchAction(toLaunch.getActionId(), toLaunch.getActionName(), args));
new LaunchAction(toLaunch.getActionId(), toLaunch.getActionName(), args));
}
}
return new LaunchCmdlet(cmdletInfo.getCid(), launchActions);
Expand All @@ -623,9 +627,10 @@ public CmdletInfo getCmdletInfo(long cid) throws IOException {
}

public CmdletGroup listCmdletsInfo(long rid, long pageIndex, long numPerPage,
List<String> orderBy, List<Boolean> isDesc) throws IOException, MetaStoreException {
List<String> orderBy,
List<Boolean> isDesc) throws IOException, MetaStoreException {
List<CmdletInfo> cmdlets = metaStore.listPageCmdlets(rid,
(pageIndex - 1) * numPerPage, numPerPage, orderBy, isDesc);
(pageIndex - 1) * numPerPage, numPerPage, orderBy, isDesc);
return new CmdletGroup(cmdlets, metaStore.getNumCmdletsByRid(rid));
}

Expand Down Expand Up @@ -676,7 +681,7 @@ public void disableCmdlet(long cid) throws IOException {
if (idToCmdlets.containsKey(cid)) {
CmdletInfo info = idToCmdlets.get(cid);
onCmdletStatusUpdate(
new CmdletStatus(info.getCid(), System.currentTimeMillis(), CmdletState.DISABLED));
new CmdletStatus(info.getCid(), System.currentTimeMillis(), CmdletState.DISABLED));

synchronized (pendingCmdlet) {
if (pendingCmdlet.contains(cid)) {
Expand Down Expand Up @@ -817,7 +822,8 @@ public List<ActionInfo> listNewCreatedActions(String actionName,
}

public List<ActionInfo> listNewCreatedActions(String actionName,
int actionNum, boolean finished) throws IOException {
int actionNum,
boolean finished) throws IOException {
try {
return metaStore.getNewCreatedActions(actionName, actionNum, finished);
} catch (MetaStoreException e) {
Expand All @@ -827,7 +833,8 @@ public List<ActionInfo> listNewCreatedActions(String actionName,
}

public List<ActionInfo> listNewCreatedActions(String actionName,
boolean successful, int actionNum) throws IOException {
boolean successful,
int actionNum) throws IOException {
try {
return metaStore.getNewCreatedActions(actionName, successful, actionNum);
} catch (MetaStoreException e) {
Expand Down Expand Up @@ -856,7 +863,7 @@ private class DetailedFileActionGroup {
private long totalNumOfActions;

public DetailedFileActionGroup(List<DetailedFileAction> detailedFileActions,
long totalNumOfActions) {
long totalNumOfActions) {
this.detailedFileActions = detailedFileActions;
this.totalNumOfActions = totalNumOfActions;
}
Expand All @@ -876,6 +883,10 @@ private class ActionGroup {
private List<ActionInfo> actions;
private long totalNumOfActions;

public ActionGroup() {
this.totalNumOfActions = 0;
}

public ActionGroup(List<ActionInfo> actions, long totalNumOfActions) {
this.actions = actions;
this.totalNumOfActions = totalNumOfActions;
Expand All @@ -884,6 +895,13 @@ public ActionGroup(List<ActionInfo> actions, long totalNumOfActions) {

public ActionGroup listActions(long pageIndex, long numPerPage,
List<String> orderBy, List<Boolean> isDesc) throws IOException, MetaStoreException {
if (pageIndex == Long.parseLong("0")) {
if (tmpActions.totalNumOfActions != 0) {
return tmpActions;
} else {
pageIndex = 1;
}
}
List<ActionInfo> infos = metaStore.listPageAction((pageIndex - 1) * numPerPage,
numPerPage, orderBy, isDesc);
for (ActionInfo info : infos) {
Expand All @@ -893,8 +911,36 @@ public ActionGroup listActions(long pageIndex, long numPerPage,
info.setProgress(memInfo.getProgress());
}
}
tmpActions = new ActionGroup(infos, metaStore.getCountOfAllAction());
return tmpActions;
}

return new ActionGroup(infos, metaStore.getCountOfAllAction());
public ActionGroup searchAction(String path, long pageIndex, long numPerPage,
List<String> orderBy, List<Boolean> isDesc) throws IOException {
try {
if (pageIndex == Long.parseLong("0")) {
if (tmpActions.totalNumOfActions != 0) {
return tmpActions;
} else {
pageIndex = 1;
}
}
List<ActionInfo> infos = metaStore.searchAction(path, (pageIndex - 1) * numPerPage,
numPerPage, orderBy, isDesc);
for (ActionInfo info : infos) {
LOG.debug("[metaStore search] " + info.getActionName());
ActionInfo memInfo = idToActions.get(info.getActionId());
if (memInfo != null) {
info.setCreateTime(memInfo.getCreateTime());
info.setProgress(memInfo.getProgress());
}
}
tmpActions = new ActionGroup(infos, infos.size());
return tmpActions;
} catch (MetaStoreException e) {
LOG.error("Search [ {} ], Get Finished Actions by search from DB error", path, e);
throw new IOException(e);
}
}

public List<ActionInfo> getActions(List<Long> aids) throws IOException {
Expand All @@ -915,10 +961,12 @@ public List<ActionInfo> getActions(long rid, int size) throws IOException {
}
}

public DetailedFileActionGroup getFileActions(long rid, long pageIndex,
long numPerPage) throws IOException, MetaStoreException {
public DetailedFileActionGroup getFileActions(long rid,
long pageIndex,
long numPerPage)
throws IOException, MetaStoreException {
List<DetailedFileAction> detailedFileActions = metaStore.listFileActions(rid,
(pageIndex - 1) * numPerPage, numPerPage);
(pageIndex - 1) * numPerPage, numPerPage);
return new DetailedFileActionGroup(detailedFileActions, metaStore.getNumFileAction(rid));
}

Expand Down Expand Up @@ -999,7 +1047,7 @@ public void onCmdletStatusUpdate(CmdletStatus status) throws IOException {
}

public void onActionStatusUpdate(ActionStatus status)
throws IOException, ActionException {
throws IOException, ActionException {
if (status == null) {
return;
}
Expand All @@ -1014,7 +1062,7 @@ public void onActionStatusUpdate(ActionStatus status)
actionInfo.setProgress(status.getPercentage());
if (actionInfo.getCreateTime() == 0) {
actionInfo.setCreateTime(
idToCmdlets.get(actionInfo.getCmdletId()).getGenerateTime());
idToCmdlets.get(actionInfo.getCmdletId()).getGenerateTime());
}
actionInfo.setFinishTime(System.currentTimeMillis());
} else {
Expand Down Expand Up @@ -1054,16 +1102,16 @@ private void inferCmdletStatus(ActionInfo actionInfo) throws IOException, Action
if (!actionInfo.isSuccessful()) {
for (int i = index + 1; i < aids.size(); i++) {
ActionStatus actionStatus = new ActionStatus(aids.get(i), ACTION_SKIP_LOG,
actionInfo.getFinishTime(), actionInfo.getFinishTime(), new Throwable(), true);
actionInfo.getFinishTime(), actionInfo.getFinishTime(), new Throwable(), true);
onActionStatusUpdate(actionStatus);
}
CmdletStatus cmdletStatus =
new CmdletStatus(cmdletId, actionInfo.getFinishTime(), CmdletState.FAILED);
new CmdletStatus(cmdletId, actionInfo.getFinishTime(), CmdletState.FAILED);
onCmdletStatusUpdate(cmdletStatus);
} else {
if (index == aids.size() - 1) {
CmdletStatus cmdletStatus =
new CmdletStatus(cmdletId, actionInfo.getFinishTime(), CmdletState.DONE);
new CmdletStatus(cmdletId, actionInfo.getFinishTime(), CmdletState.DONE);
onCmdletStatusUpdate(cmdletStatus);
}
}
Expand Down Expand Up @@ -1092,7 +1140,7 @@ private void updateStorageIfNeeded(ActionInfo info) throws ActionException {
}

protected List<ActionInfo> createActionInfos(CmdletDescriptor cmdletDescriptor, long cid)
throws IOException {
throws IOException {
List<ActionInfo> actionInfos = new ArrayList<>();
for (int index = 0; index < cmdletDescriptor.getActionSize(); index++) {
Map<String, String> args = cmdletDescriptor.getActionArgs(index);
Expand Down Expand Up @@ -1142,13 +1190,13 @@ private class CmdletPurgeTask implements Runnable {

public CmdletPurgeTask(SmartConf conf) throws IOException {
maxNumRecords = conf.getInt(SmartConfKeys.SMART_CMDLET_HIST_MAX_NUM_RECORDS_KEY,
SmartConfKeys.SMART_CMDLET_HIST_MAX_NUM_RECORDS_DEFAULT);
SmartConfKeys.SMART_CMDLET_HIST_MAX_NUM_RECORDS_DEFAULT);
String lifeString = conf.get(SmartConfKeys.SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_KEY,
SmartConfKeys.SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_DEFAULT);
SmartConfKeys.SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_DEFAULT);
maxLifeTime = StringUtil.pharseTimeString(lifeString);
if (maxLifeTime == -1) {
throw new IOException("Invalid value format for configure option. "
+ SmartConfKeys.SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_KEY + "=" + lifeString);
+ SmartConfKeys.SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_KEY + "=" + lifeString);
}
lifeCheckInterval = maxLifeTime / 20 > 5000 ? (maxLifeTime / 20) : 5000;
}
Expand Down Expand Up @@ -1189,7 +1237,7 @@ public void run() {
continue;
}
if (cmdletInfo.getState() == CmdletState.DISPATCHED
|| cmdletInfo.getState() == CmdletState.EXECUTING) {
|| cmdletInfo.getState() == CmdletState.EXECUTING) {
for (long id : cmdletInfo.getAids()) {
ActionInfo actionInfo = idToActions.get(id);
if (isTimeout(actionInfo)) {
Expand All @@ -1200,7 +1248,7 @@ public void run() {
}
long finishTime = System.currentTimeMillis();
ActionStatus actionStatus = new ActionStatus(actionInfo.getActionId(),
TIMEOUTLOG, startTime, finishTime, new Throwable(), true);
TIMEOUTLOG, startTime, finishTime, new Throwable(), true);
onActionStatusUpdate(actionStatus);
}
}
Expand Down
Loading

0 comments on commit 628718e

Please sign in to comment.