Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1876, Add host info for actions (#1877)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou authored Aug 2, 2018
1 parent 4b848d5 commit 927d398
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 12 deletions.
11 changes: 11 additions & 0 deletions smart-common/src/main/java/org/smartdata/model/ActionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class ActionInfo {
private String result;
private String log;

// For action set flexibility
private String execHost;
private boolean successful;
private long createTime;
private boolean finished;
Expand All @@ -52,6 +54,7 @@ public ActionInfo(long actionId, long cmdletId, String actionName,
this.args = args;
this.result = result;
this.log = log;
this.execHost = "";
this.successful = successful;
this.createTime = createTime;
this.finished = finished;
Expand Down Expand Up @@ -159,6 +162,14 @@ public void setProgress(float progress) {
this.progress = progress;
}

public String getExecHost() {
return execHost;
}

public void setExecHost(String execHost) {
this.execHost = execHost;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
*/
public class NodeInfo {
private String id;
private String location;
private String host;
private int port;
private ExecutorType executorType;

public NodeInfo(String id, String location, ExecutorType executorType) {
this.id = id;
this.location = location;
this.executorType = executorType;
doSetLocation(location);
}

public String getId() {
Expand All @@ -44,11 +45,31 @@ public void setId(String id) {
}

public String getLocation() {
return location;
return host + ":" + port;
}

public void setLocation(String location) {
this.location = location;
doSetLocation(location);
}

private void doSetLocation(String location) {
host = null;
port = 0;
if (location != null) {
String[] its = location.split(":");
if (its.length > 1) {
port = Integer.valueOf(its[1]);
}
host = its[0];
}
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public ExecutorType getExecutorType() {
Expand All @@ -57,7 +78,7 @@ public ExecutorType getExecutorType() {

@Override
public String toString() {
return String.format("{id=%s, location=%s, executorType=%s}", id, location, executorType);
return String.format("{id=%s, location=%s, executorType=%s}", id, getLocation(), executorType);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,21 @@ public List<DetailedFileAction> getFileActions(long rid, int size) throws IOExce
}
}

public void updateCmdletExecHost(long cmdletId, String host) throws IOException {
CmdletInfo cmdlet = getCmdletInfo(cmdletId);
if (cmdlet == null) {
return;
}

ActionInfo action;
for (long id : cmdlet.getAids()) {
action = getActionInfo(id);
if (action != null) {
action.setExecHost(host);
}
}
}

/**
* Delete all cmdlets related with rid.
* @param rid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.smartdata.model.action.ActionScheduler;
import org.smartdata.protocol.message.ActionStatus;
import org.smartdata.protocol.message.CmdletStatus;
import org.smartdata.server.cluster.NodeInfo;
import org.smartdata.server.engine.CmdletManager;
import org.smartdata.server.engine.cmdlet.message.LaunchCmdlet;
import org.smartdata.server.engine.message.NodeMessage;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class CmdletDispatcher {
private AtomicInteger index = new AtomicInteger(0);

private Map<String, AtomicInteger> regNodes = new HashMap<>();
private Map<String, NodeInfo> regNodeInfos = new HashMap<>();

private List<List<String>> cmdExecSrvNodeIds = new ArrayList<>();
private String[] completeOn = new String[ExecutorType.values().length];
Expand Down Expand Up @@ -158,7 +160,15 @@ public LaunchCmdlet getNextCmdletToRun() throws IOException {
return launchCmdlet;
}

private void updateCmdActionStatus(LaunchCmdlet cmdlet) {
private void updateCmdActionStatus(LaunchCmdlet cmdlet, String host) {
if (cmdletManager != null) {
try {
cmdletManager.updateCmdletExecHost(cmdlet.getCmdletId(), host);
} catch (IOException e) {
// Ignore this
}
}

try {
for (LaunchAction action : cmdlet.getLaunchActions()) {
ActionStatus actionStatus = new ActionStatus(
Expand Down Expand Up @@ -339,14 +349,16 @@ private boolean dispatch(LaunchCmdlet cmdlet) {
return false;
}

updateCmdActionStatus(cmdlet);
NodeInfo nodeInfo = regNodeInfos.get(nodeId);
String host = nodeInfo == null ? "" : nodeInfo.getHost();
updateCmdActionStatus(cmdlet, host);
dispatchedToSrvs.put(cmdlet.getCmdletId(), selected.getExecutorType());

if (logDispResult) {
LOG.info(
String.format(
"Dispatching cmdlet->[%s] to executor service %s : %s",
cmdlet.getCmdletId(), selected.getExecutorType(), nodeId));
cmdlet.getCmdletId(), selected.getExecutorType(), host));
}
return true;
}
Expand Down Expand Up @@ -422,6 +434,7 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) {
return;
} else {
regNodes.put(nodeId, new AtomicInteger(defaultSlots));
regNodeInfos.put(nodeId, msg.getNodeInfo());
cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).add(nodeId);
}
} else {
Expand All @@ -430,6 +443,7 @@ public void onNodeMessage(NodeMessage msg, boolean isAdd) {
return;
} else {
regNodes.remove(nodeId);
regNodeInfos.remove(nodeId);
cmdExecSrvNodeIds.get(msg.getNodeInfo().getExecutorType().ordinal()).remove(nodeId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public List<ActionInfo> searchAction(String path, long start, long offset, List<
+ "OR cid LIKE '%" + path + "%' ESCAPE '/' "
+ "OR args LIKE '%" + path + "%' ESCAPE '/' "
+ "OR result LIKE '%" + path + "%' ESCAPE '/' "
+ "OR exec_host LIKE '%" + path + "%' ESCAPE '/' "
+ "OR progress LIKE '%" + path + "%' ESCAPE '/' "
+ "OR log LIKE '%" + path + "%' ESCAPE '/' "
+ "OR action_name LIKE '%" + path + "%' ESCAPE '/')";
Expand Down Expand Up @@ -309,8 +310,9 @@ public int[] replace(final ActionInfo[] actionInfos) {
+ "create_time, "
+ "finished, "
+ "finish_time, "
+ "exec_host, "
+ "progress)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
return jdbcTemplate.batchUpdate(sql,
new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps,
Expand All @@ -325,7 +327,8 @@ public void setValues(PreparedStatement ps,
ps.setLong(8, actionInfos[i].getCreateTime());
ps.setBoolean(9, actionInfos[i].isFinished());
ps.setLong(10, actionInfos[i].getFinishTime());
ps.setFloat(11, actionInfos[i].getProgress());
ps.setString(11, actionInfos[i].getExecHost());
ps.setFloat(12, actionInfos[i].getProgress());
}
public int getBatchSize() {
return actionInfos.length;
Expand All @@ -349,6 +352,7 @@ public int[] update(final ActionInfo[] actionInfos) {
+ "create_time = ?, "
+ "finished = ?, "
+ "finish_time = ?, "
+ "exec_host = ?, "
+ "progress = ? "
+ "WHERE aid = ?";
return jdbcTemplate.batchUpdate(sql,
Expand All @@ -361,8 +365,9 @@ public void setValues(PreparedStatement ps,
ps.setLong(4, actionInfos[i].getCreateTime());
ps.setBoolean(5, actionInfos[i].isFinished());
ps.setLong(6, actionInfos[i].getFinishTime());
ps.setFloat(7, actionInfos[i].getProgress());
ps.setLong(8, actionInfos[i].getActionId());
ps.setString(7, actionInfos[i].getExecHost());
ps.setFloat(8, actionInfos[i].getProgress());
ps.setLong(9, actionInfos[i].getActionId());
}

public int getBatchSize() {
Expand Down Expand Up @@ -395,6 +400,7 @@ private Map<String, Object> toMap(ActionInfo actionInfo) {
parameters.put("create_time", actionInfo.getCreateTime());
parameters.put("finished", actionInfo.isFinished());
parameters.put("finish_time", actionInfo.getFinishTime());
parameters.put("exec_host", actionInfo.getExecHost());
parameters.put("progress", actionInfo.getProgress());
return parameters;
}
Expand All @@ -416,6 +422,7 @@ public ActionInfo mapRow(ResultSet resultSet, int i) throws SQLException {
actionInfo.setCreateTime(resultSet.getLong("create_time"));
actionInfo.setFinished(resultSet.getBoolean("finished"));
actionInfo.setFinishTime(resultSet.getLong("finish_time"));
actionInfo.setExecHost(resultSet.getString("exec_host"));
actionInfo.setProgress(resultSet.getFloat("progress"));
return actionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public static void initializeDataBase(
+ " create_time bigint(20) NOT NULL,\n"
+ " finished tinyint(4) NOT NULL,\n"
+ " finish_time bigint(20) NOT NULL,\n"
+ " exec_host varchar(255),\n"
+ " progress float NOT NULL\n"
+ ") ;",
"CREATE TABLE file_diff (\n"
Expand Down

0 comments on commit 927d398

Please sign in to comment.